diff --git a/README.md b/README.md index f63414f..a8ed17b 100644 --- a/README.md +++ b/README.md @@ -1,169 +1,160 @@ -# GoTiDB - 时序数据库 +# GoTiDB - 轻量级时序数据库 -GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专门用于存储和查询时间序列数据。它支持高效的数据写入、查询和实时数据推送功能。 +GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专为高效存储和查询时间序列数据而设计。它提供了简单而强大的 API,支持高吞吐量的数据写入和灵活的查询功能。 -## 特性 +## 功能特点 -- 高性能内存存储引擎 -- WAL(预写日志)持久化 -- REST API 接口 -- WebSocket 实时数据推送 -- NATS 消息系统集成 -- Prometheus 指标监控 -- 支持自定义标签的数据点 -- 环形缓冲区数据结构 -- 支持多种查询类型(最新值、所有值、持续时间) +- **高效存储**: 使用基于文件的存储引擎,针对时间序列数据进行了优化 +- **灵活查询**: 支持原始数据查询、最新值查询和聚合查询 +- **标签索引**: 使用多维标签索引,支持按标签快速过滤数据 +- **时间窗口**: 高效的时间窗口索引,加速时间范围查询 +- **数据压缩**: 支持自动压缩旧数据,节省存储空间 +- **数据保留**: 自动清理过期数据,支持配置保留策略 +- **并发安全**: 支持多个并发读写操作 +- **可扩展**: 模块化设计,易于扩展和定制 ## 安装 -确保你已经安装了 Go 1.16 或更高版本。 - ```bash -git clone git.pyer.club/kingecg/gotidb -cd gotidb -go mod download +go get git.pyer.club/kingecg/gotidb ``` -## 构建 +## 快速开始 -```bash -go build -o gotidb cmd/server/main.go +以下是一个简单的示例,展示如何使用 GoTiDB: + +```go +package main + +import ( + "context" + "fmt" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + _ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎 +) + +func main() { + // 创建引擎配置 + config := &engine.FileEngineConfig{ + DataDir: "/path/to/data", + SegmentSize: 1024 * 1024, // 1MB + MaxSegments: 10, + WriteBufferSize: 1000, + } + + // 创建引擎 + e, err := engine.NewEngine(engine.EngineConfig{ + Type: "file", + FileConfig: config, + }) + if err != nil { + fmt.Printf("Failed to create engine: %v\n", err) + return + } + + // 打开引擎 + if err := e.Open(); err != nil { + fmt.Printf("Failed to open engine: %v\n", err) + return + } + defer e.Close() + + // 写入数据 + points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + }, + }, + } + + ctx := context.Background() + if err := e.Write(ctx, points); err != nil { + fmt.Printf("Failed to write points: %v\n", err) + return + } + + // 查询数据 + query := engine.Query{ + Type: engine.QueryTypeRaw, + StartTime: time.Now().Add(-time.Hour).UnixNano(), + EndTime: time.Now().UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + } + + result, err := e.Query(ctx, query) + if err != nil { + fmt.Printf("Failed to query: %v\n", err) + return + } + + // 处理查询结果 + for _, series := range result { + fmt.Printf("Series ID: %s\n", series.SeriesID) + for _, point := range series.Points { + fmt.Printf(" Timestamp: %s, Value: %f\n", + time.Unix(0, point.Timestamp).Format(time.RFC3339), + point.Value) + } + } +} ``` -## 运行 +更多示例请参考 [examples](./examples) 目录。 -```bash -./gotidb [options] -``` +## 配置选项 -### 可用选项 +### 文件引擎配置 -- `-rest-addr`: REST API 服务地址(默认:":8080") -- `-ws-addr`: WebSocket 服务地址(默认:":8081") -- `-metrics-addr`: 指标服务地址(默认:":8082") -- `-quic-addr`: QUIC 服务地址(默认:":8083") -- `-nats-url`: NATS 服务器地址(默认:"nats://localhost:4222") -- `-persistence`: 持久化类型(none, wal, boltdb)(默认:"none") -- `-persistence-dir`: 持久化目录(默认:"./data") -- `-sync-every`: 每写入多少条数据同步一次(默认:100) -- `-config`: 配置文件路径(默认:"config.yaml") +| 选项 | 描述 | 默认值 | +|------|------|--------| +| DataDir | 数据存储目录 | 必填 | +| SegmentSize | 段文件大小限制(字节) | 64MB | +| MaxSegments | 最大段文件数量 | 100 | +| WriteBufferSize | 写入缓冲区大小(数据点数) | 1000 | +| IndexCacheSize | 索引缓存大小(字节) | 32MB | +| UseCompression | 是否启用压缩 | false | +| CompressionLevel | 压缩级别(0-9) | 6 | +| CompactThreshold | 触发压缩的阈值(段文件数量比例) | 0.7 | +| MaxOpenFiles | 最大打开文件数 | 100 | +| SyncWrites | 是否同步写入(更安全但更慢) | false | +| RetentionPeriod | 数据保留时间 | 30d | -### 持久化选项 +## 性能考虑 -GoTiDB 支持多种持久化方式: +- **写入性能**: 使用写入缓冲区和异步刷新可以提高写入性能 +- **查询性能**: 使用标签索引和时间窗口索引加速查询 +- **存储效率**: 启用压缩可以减少存储空间占用,但会增加 CPU 使用率 +- **内存使用**: 调整索引缓存大小可以平衡内存使用和查询性能 +- **文件描述符**: 调整最大打开文件数以适应系统限制 -1. **内存存储(无持久化)**:数据仅保存在内存中,服务重启后数据丢失。 - - 配置:`-persistence=none` +## 架构 -2. **WAL 日志持久化**:使用预写日志(Write-Ahead Log)进行持久化,支持数据恢复。 - - 配置:`-persistence=wal -persistence-dir=./data -sync-every=100` +GoTiDB 的核心架构包括: -3. **BoltDB 持久化**:使用 BoltDB 进行持久化,提供更高的可靠性和查询性能。 - - 配置:`-persistence=boltdb -persistence-dir=./data` - - 配置文件中可设置:`boltdb_filename`(数据库文件名)和 `boltdb_bucket_size`(数据分桶大小) +1. **引擎接口**: 定义了存储引擎的通用接口 +2. **文件引擎**: 基于文件系统的存储引擎实现 +3. **索引管理**: 标签索引和时间窗口索引 +4. **查询处理**: 原始查询、最新值查询和聚合查询 +5. **后台任务**: 数据压缩和过期数据清理 -## API 使用 +## 贡献 -### REST API +欢迎贡献代码、报告问题或提出改进建议!请遵循以下步骤: -#### 写入数据 - -```bash -curl -X POST http://localhost:8080/api/v1/write \ - -H "Content-Type: application/json" \ - -d '{ - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "value": 25.5 - }' -``` - -#### 批量写入数据 - -```bash -curl -X POST http://localhost:8080/api/v1/batch_write \ - -H "Content-Type: application/json" \ - -d '{ - "points": [ - { - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "value": 25.5 - }, - { - "device_id": "device2", - "metric_code": "humidity", - "labels": { - "location": "room2" - }, - "value": 60 - } - ] - }' -``` - -#### 查询数据 - -```bash -curl -X POST http://localhost:8080/api/v1/query \ - -H "Content-Type: application/json" \ - -d '{ - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "query_type": "latest" - }' -``` - -### WebSocket API - -连接 WebSocket 服务: - -```javascript -const ws = new WebSocket('ws://localhost:8081/ws'); - -// 订阅数据点 -ws.send(JSON.stringify({ - device_id: "device1", - metric_code: "temperature", - labels: { - location: "room1" - } -})); - -// 接收数据更新 -ws.onmessage = function(event) { - const data = JSON.parse(event.data); - console.log('Received update:', data); -}; -``` - -## 监控 - -访问 `http://localhost:8082/metrics` 查看 Prometheus 指标。 - -可用指标: - -- `gotidb_write_total`: 写入操作总数 -- `gotidb_query_total`: 查询操作总数 -- `gotidb_write_latency_seconds`: 写入操作延迟 -- `gotidb_query_latency_seconds`: 查询操作延迟 -- `gotidb_active_connections`: 活跃连接数 -- `gotidb_data_points_count`: 数据点数量 -- `gotidb_persistence_latency_seconds`: 持久化操作延迟 -- `gotidb_persistence_errors_total`: 持久化错误总数 -- `gotidb_messaging_latency_seconds`: 消息操作延迟 -- `gotidb_messaging_errors_total`: 消息错误总数 -- `gotidb_websocket_connections`: WebSocket 连接数 +1. Fork 项目 +2. 创建功能分支 (`git checkout -b feature/amazing-feature`) +3. 提交更改 (`git commit -m 'Add some amazing feature'`) +4. 推送到分支 (`git push origin feature/amazing-feature`) +5. 创建 Pull Request ## 许可证 -MIT License \ No newline at end of file +本项目采用 MIT 许可证 - 详见 [LICENSE](LICENSE) 文件。 \ No newline at end of file diff --git a/cmd/server/main.go b/cmd/server/main.go index 0ac09f6..2230f16 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -138,7 +138,7 @@ func main() { quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 } - quicServer, err := api.NewQUICServer(dataManager, quicConfig) + quicServer, err = api.NewQUICServer(dataManager, quicConfig) if err != nil { log.Printf("Failed to create QUIC server: %v", err) log.Println("Continuing without QUIC server") diff --git a/docs/design/engine-design.md b/docs/design/engine-design.md new file mode 100644 index 0000000..5b072b1 --- /dev/null +++ b/docs/design/engine-design.md @@ -0,0 +1,450 @@ +# 存储引擎设计文档 + +## 1. 概述 + +GoTiDB存储引擎抽象层旨在提供统一的接口,使不同的存储后端可以无缝集成到系统中。本文档描述了存储引擎的设计原则、接口定义和实现建议。 + +## 2. 设计目标 + +- **抽象统一**: 提供一致的API,隐藏不同存储引擎的实现细节 +- **可扩展性**: 支持添加新的存储引擎而无需修改核心代码 +- **性能优化**: 针对时序数据的特点进行优化 +- **可配置性**: 允许通过配置调整引擎行为 + +## 3. 存储引擎接口 + +### 3.1 核心接口 + +```go +// Engine 是所有存储引擎必须实现的基础接口 +type Engine interface { + // 基本生命周期 + Open() error + Close() error + + // 数据操作 + WritePoint(ctx context.Context, point DataPoint) error + WriteBatch(ctx context.Context, points []DataPoint) error + + // 查询操作 + Query(ctx context.Context, query Query) (QueryResult, error) + + // 管理操作 + Flush() error + Compact() error + + // 监控 + Stats() EngineStats + + // 能力查询 + Capabilities() EngineCapabilities +} +``` + +### 3.2 扩展接口 + +特定引擎可以实现额外接口来提供特殊功能: + +```go +// PersistentEngine 提供持久化功能 +type PersistentEngine interface { + Engine + Backup(path string) error + Restore(path string) error +} + +// ReplicatedEngine 提供复制功能 +type ReplicatedEngine interface { + Engine + AddReplica(addr string) error + RemoveReplica(addr string) error +} +``` + +## 4. 统一查询接口 + +所有读操作通过统一的Query接口实现,提供灵活性和一致性: + +```go +// Query 定义查询参数 +type Query struct { + // 查询类型 + Type QueryType + + // 时间范围 + StartTime int64 + EndTime int64 + + // 序列标识 + SeriesID string + DeviceID string + MetricCode string + + // 标签过滤 + TagFilters []TagFilter + + // 聚合选项 + Aggregation AggregationType + AggInterval time.Duration + IncludeRawData bool + + // 结果限制 + Limit int + Offset int + + // 其他查询选项 + Options map[string]interface{} +} + +// QueryType 定义查询类型 +type QueryType int + +const ( + // 原始数据查询 + QueryTypeRaw QueryType = iota + + // 聚合查询 + QueryTypeAggregate + + // 最新值查询 + QueryTypeLatest + + // 标签查询 + QueryTypeTags + + // 元数据查询 + QueryTypeMetadata +) + +// TagFilter 定义标签过滤条件 +type TagFilter struct { + Key string + Operator FilterOperator + Value string +} + +// FilterOperator 定义过滤操作符 +type FilterOperator int + +const ( + OpEqual FilterOperator = iota + OpNotEqual + OpRegex + OpGreaterThan + OpLessThan + // 更多操作符... +) + +// AggregationType 定义聚合类型 +type AggregationType int + +const ( + AggNone AggregationType = iota + AggSum + AggAvg + AggMin + AggMax + AggCount + // 更多聚合类型... +) +``` + +### 4.1 查询结果 + +```go +// QueryResult 定义查询结果 +type QueryResult interface { + // 结果类型 + Type() QueryType +} + +// TimeSeriesResult 定义时间序列查询结果 +type TimeSeriesResult struct { + SeriesID string + Points []DataPoint +} + +// AggregateResult 定义聚合查询结果 +type AggregateResult struct { + SeriesID string + Groups []AggregateGroup +} + +type AggregateGroup struct { + StartTime int64 + EndTime int64 + Value float64 + Count int +} +``` + +### 4.2 查询构建器 + +为了简化查询构建,提供流式API: + +```go +query := NewQueryBuilder(). + ForMetric("cpu.usage"). + WithTimeRange(startTime, endTime). + WithTag("host", OpEqual, "server01"). + WithAggregation(AggAvg, 5*time.Minute). + Build() +``` + +## 5. 配置抽象 + +```go +type EngineConfig interface { + // 通用配置方法 + WithMaxRetention(duration time.Duration) EngineConfig + WithMaxPoints(points int) EngineConfig + WithFlushInterval(interval time.Duration) EngineConfig + + // 获取特定引擎的配置 + MemoryConfig() *MemoryEngineConfig + FileConfig() *FileEngineConfig + // 其他引擎... +} + +// 内存引擎特定配置 +type MemoryEngineConfig struct { + MaxPointsPerSeries int // 可配置的保留点数,替代硬编码的30 + UseCompression bool + // 其他内存引擎特定参数... +} +``` + +## 6. 引擎注册机制 + +```go +// EngineRegistry 管理所有可用的存储引擎 +type EngineRegistry struct { + engines map[string]EngineFactory +} + +// EngineFactory 创建存储引擎实例 +type EngineFactory func(config EngineConfig) (Engine, error) + +// 注册新引擎 +func (r *EngineRegistry) Register(name string, factory EngineFactory) { + r.engines[name] = factory +} + +// 创建引擎实例 +func (r *EngineRegistry) Create(name string, config EngineConfig) (Engine, error) { + if factory, ok := r.engines[name]; ok { + return factory(config) + } + return nil, fmt.Errorf("unknown engine: %s", name) +} +``` + +## 7. 性能优化建议 + +### 7.1 写入路径优化 + +实现写入缓冲区合并小批量写入: + +```go +type WriteBuffer struct { + points map[string][]DataPoint // 按序列ID分组 + mu sync.Mutex + maxSize int + flushCh chan struct{} + engine Engine +} + +func (wb *WriteBuffer) Add(point DataPoint) { + wb.mu.Lock() + seriesID := point.SeriesID() + wb.points[seriesID] = append(wb.points[seriesID], point) + size := len(wb.points) + wb.mu.Unlock() + + if size >= wb.maxSize { + wb.Flush() + } +} + +func (wb *WriteBuffer) Flush() { + wb.mu.Lock() + points := wb.points + wb.points = make(map[string][]DataPoint) + wb.mu.Unlock() + + // 批量写入引擎 + wb.engine.WriteBatch(context.Background(), points) +} +``` + +### 7.2 并发控制优化 + +实现分片锁减少锁竞争: + +```go +type ShardedLock struct { + locks []sync.RWMutex + shardMask uint64 +} + +func NewShardedLock(shards int) *ShardedLock { + // 确保分片数是2的幂 + shards = nextPowerOfTwo(shards) + return &ShardedLock{ + locks: make([]sync.RWMutex, shards), + shardMask: uint64(shards - 1), + } +} + +func (sl *ShardedLock) getLockForKey(key string) *sync.RWMutex { + h := fnv.New64() + h.Write([]byte(key)) + hashVal := h.Sum64() + return &sl.locks[hashVal&sl.shardMask] +} + +func (sl *ShardedLock) Lock(key string) { + sl.getLockForKey(key).Lock() +} + +func (sl *ShardedLock) Unlock(key string) { + sl.getLockForKey(key).Unlock() +} +``` + +### 7.3 内存优化 + +实现时序数据的紧凑存储: + +```go +// 紧凑存储时间戳和值 +type CompactTimeSeriesBlock struct { + baseTime int64 + deltaEncode []byte // 使用delta编码存储时间戳 + values []byte // 压缩存储的值 +} + +func NewCompactBlock(baseTime int64, capacity int) *CompactTimeSeriesBlock { + return &CompactTimeSeriesBlock{ + baseTime: baseTime, + deltaEncode: make([]byte, 0, capacity*binary.MaxVarintLen64), + values: make([]byte, 0, capacity*8), // 假设double值 + } +} + +func (b *CompactTimeSeriesBlock) AddPoint(timestamp int64, value float64) { + // 存储时间增量 + delta := timestamp - b.baseTime + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutVarint(buf, delta) + b.deltaEncode = append(b.deltaEncode, buf[:n]...) + + // 存储值 + bits := math.Float64bits(value) + buf = make([]byte, 8) + binary.LittleEndian.PutUint64(buf, bits) + b.values = append(b.values, buf...) +} +``` + +### 7.4 查询优化 + +实现时间范围索引: + +```go +type TimeRangeIndex struct { + // 每个时间窗口的起始位置 + windows []timeWindow + blockSize int64 // 时间窗口大小,如1小时 +} + +type timeWindow struct { + startTime int64 + endTime int64 + offset int // 数据块中的偏移 +} + +func (idx *TimeRangeIndex) FindBlocks(start, end int64) []int { + var result []int + for i, window := range idx.windows { + if window.endTime >= start && window.startTime <= end { + result = append(result, i) + } + } + return result +} +``` + +## 8. 实现路线图 + +1. **定义核心接口** + - 实现Engine接口 + - 定义Query和QueryResult结构 + +2. **重构现有引擎** + - 调整内存引擎以实现新接口 + - 使MaxPointsPerSeries可配置 + +3. **实现查询构建器** + - 创建流式API构建查询 + +4. **添加性能优化** + - 实现写入缓冲区 + - 添加分片锁 + - 优化内存使用 + +5. **实现引擎注册机制** + - 创建EngineRegistry + - 支持动态引擎选择 + +6. **添加监控和统计** + - 实现Stats接口 + - 收集性能指标 + +## 9. 使用示例 + +```go +// 创建引擎 +registry := NewEngineRegistry() +registry.Register("memory", NewMemoryEngine) +registry.Register("file", NewFileEngine) + +config := NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + +engine, err := registry.Create("memory", config) +if err != nil { + log.Fatal(err) +} + +// 写入数据 +point := DataPoint{ + DeviceID: "device1", + MetricCode: "temperature", + Labels: map[string]string{"location": "room1"}, + Value: 25.5, + Timestamp: time.Now().UnixNano(), +} +err = engine.WritePoint(context.Background(), point) + +// 查询数据 +query := NewQueryBuilder(). + ForMetric("temperature"). + WithTimeRange(startTime, endTime). + WithTag("location", OpEqual, "room1"). + Build() + +result, err := engine.Query(context.Background(), query) +if err != nil { + log.Fatal(err) +} + +// 处理结果 +if tsResult, ok := result.(*TimeSeriesResult); ok { + for _, point := range tsResult.Points { + fmt.Printf("Time: %v, Value: %v\n", + time.Unix(0, point.Timestamp), point.Value) + } +} +``` \ No newline at end of file diff --git a/examples/engine/main.go b/examples/engine/main.go new file mode 100644 index 0000000..76d7737 --- /dev/null +++ b/examples/engine/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + "git.pyer.club/kingecg/gotidb/pkg/engine/memory" +) + +func main() { + // 创建引擎注册表 + registry := engine.NewEngineRegistry() + + // 注册内存引擎 + memory.Register(registry) + + // 创建引擎配置 + config := engine.NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + + // 创建内存引擎实例 + eng, err := registry.Create("memory", config) + if err != nil { + log.Fatal("Failed to create engine:", err) + } + + // 打开引擎 + if err := eng.Open(); err != nil { + log.Fatal("Failed to open engine:", err) + } + defer eng.Close() + + // 写入一些测试数据 + deviceID := "device001" + metricCode := "temperature" + now := time.Now() + + // 写入单个数据点 + point := engine.DataPoint{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5, + Timestamp: now.UnixNano(), + } + + if err := eng.WritePoint(context.Background(), point); err != nil { + log.Fatal("Failed to write point:", err) + } + + // 写入一批数据点 + var points []engine.DataPoint + for i := 0; i < 10; i++ { + points = append(points, engine.DataPoint{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5 + float64(i), + Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Fatal("Failed to write batch:", err) + } + + // 查询最新数据 + latestQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTag("location", engine.OpEqual, "room1"). + Build() + latestQuery.Type = engine.QueryTypeLatest + + result, err := eng.Query(context.Background(), latestQuery) + if err != nil { + log.Fatal("Failed to query latest data:", err) + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Println("\nLatest data:") + for _, p := range tsResult.Points { + fmt.Printf("Time: %v, Value: %.2f\n", + time.Unix(0, p.Timestamp).Format(time.RFC3339), + p.Value) + } + } + + // 查询原始数据 + rawQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). + WithTag("location", engine.OpEqual, "room1"). + Build() + + result, err = eng.Query(context.Background(), rawQuery) + if err != nil { + log.Fatal("Failed to query raw data:", err) + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Println("\nRaw data:") + for _, p := range tsResult.Points { + fmt.Printf("Time: %v, Value: %.2f\n", + time.Unix(0, p.Timestamp).Format(time.RFC3339), + p.Value) + } + } + + // 查询聚合数据 + aggQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). + WithTag("location", engine.OpEqual, "room1"). + WithAggregation(engine.AggAvg, 5*time.Minute). + Build() + + result, err = eng.Query(context.Background(), aggQuery) + if err != nil { + log.Fatal("Failed to query aggregate data:", err) + } + + if aggResult, ok := result.(*engine.AggregateResult); ok { + fmt.Println("\nAggregate data (5-minute averages):") + for _, g := range aggResult.Groups { + fmt.Printf("Time range: %v - %v, Average: %.2f, Count: %d\n", + time.Unix(0, g.StartTime).Format(time.RFC3339), + time.Unix(0, g.EndTime).Format(time.RFC3339), + g.Value, + g.Count) + } + } + + // 打印引擎统计信息 + stats := eng.Stats() + fmt.Printf("\nEngine stats:\n") + fmt.Printf("Points count: %d\n", stats.PointsCount) + fmt.Printf("Last write time: %v\n", stats.LastWriteTime.Format(time.RFC3339)) + + // 打印引擎能力 + caps := eng.Capabilities() + fmt.Printf("\nEngine capabilities:\n") + fmt.Printf("Supports compression: %v\n", caps.SupportsCompression) + fmt.Printf("Supports persistence: %v\n", caps.SupportsPersistence) + fmt.Printf("Supports replication: %v\n", caps.SupportsReplication) + fmt.Printf("Max concurrent writes: %d\n", caps.MaxConcurrentWrites) +} diff --git a/examples/engine/multi_engine.go b/examples/engine/multi_engine.go new file mode 100644 index 0000000..14f8b76 --- /dev/null +++ b/examples/engine/multi_engine.go @@ -0,0 +1,263 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + "git.pyer.club/kingecg/gotidb/pkg/engine/file" + "git.pyer.club/kingecg/gotidb/pkg/engine/memory" +) + +func main() { + // 创建引擎注册表 + registry := engine.NewEngineRegistry() + + // 注册内存引擎和文件引擎 + memory.Register(registry) + file.Register(registry) + + // 创建临时目录用于文件引擎 + tempDir, err := os.MkdirTemp("", "gotidb_example_*") + if err != nil { + log.Fatal("Failed to create temp dir:", err) + } + defer os.RemoveAll(tempDir) + + // 创建内存引擎配置 + memConfig := engine.NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + + // 创建文件引擎配置 + fileConfig := engine.NewEngineConfig() + fileConfig.SetFileConfig(&engine.FileEngineConfig{ + DataDir: tempDir, + SegmentSize: 1024 * 1024, // 1MB + CompactWindow: time.Hour, + MaxSegments: 10, + UseCompression: true, + CompressionLevel: 6, + }) + + // 创建内存引擎和文件引擎实例 + memEng, err := registry.Create("memory", memConfig) + if err != nil { + log.Fatal("Failed to create memory engine:", err) + } + + fileEng, err := registry.Create("file", fileConfig) + if err != nil { + log.Fatal("Failed to create file engine:", err) + } + + // 打开引擎 + if err := memEng.Open(); err != nil { + log.Fatal("Failed to open memory engine:", err) + } + defer memEng.Close() + + if err := fileEng.Open(); err != nil { + log.Fatal("Failed to open file engine:", err) + } + defer fileEng.Close() + + // 演示不同场景下的引擎使用 + + // 场景1:高频写入,短期存储 - 使用内存引擎 + fmt.Println("\n=== 场景1:高频写入,短期存储(内存引擎)===") + demoHighFrequencyWrites(memEng) + + // 场景2:长期存储,历史数据查询 - 使用文件引擎 + fmt.Println("\n=== 场景2:长期存储,历史数据查询(文件引擎)===") + demoHistoricalData(fileEng) + + // 场景3:聚合查询性能对比 + fmt.Println("\n=== 场景3:聚合查询性能对比 ===") + demoAggregationComparison(memEng, fileEng) + + // 打印引擎统计信息 + printEngineStats("Memory Engine", memEng) + printEngineStats("File Engine", fileEng) +} + +// 演示高频写入场景 +func demoHighFrequencyWrites(eng engine.Engine) { + start := time.Now() + count := 1000 + + // 批量写入数据 + var points []engine.DataPoint + for i := 0; i < count; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor001", + MetricCode: "temperature", + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5 + float64(i%10), + Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write batch: %v", err) + return + } + + duration := time.Since(start) + fmt.Printf("写入 %d 个数据点耗时: %v (%.2f points/sec)\n", + count, duration, float64(count)/duration.Seconds()) + + // 查询最新数据 + query := engine.NewQueryBuilder(). + ForMetric("temperature"). + WithTag("location", engine.OpEqual, "room1"). + Build() + query.Type = engine.QueryTypeLatest + + result, err := eng.Query(context.Background(), query) + if err != nil { + log.Printf("Failed to query latest data: %v", err) + return + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Printf("最新数据点: %.2f (时间: %v)\n", + tsResult.Points[0].Value, + time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) + } +} + +// 演示历史数据存储和查询场景 +func demoHistoricalData(eng engine.Engine) { + // 写入跨越多个时间段的数据 + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 24; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor002", + MetricCode: "power", + Labels: map[string]string{ + "device": "solar_panel", + "unit": "watts", + }, + Value: 100 + float64(i*50), + Timestamp: now.Add(time.Duration(-i) * time.Hour).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write historical data: %v", err) + return + } + + // 查询24小时内的数据 + query := engine.NewQueryBuilder(). + ForMetric("power"). + WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). + WithTag("device", engine.OpEqual, "solar_panel"). + Build() + + result, err := eng.Query(context.Background(), query) + if err != nil { + log.Printf("Failed to query historical data: %v", err) + return + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Printf("24小时内的数据点数量: %d\n", len(tsResult.Points)) + if len(tsResult.Points) > 0 { + fmt.Printf("最早数据点: %.2f (时间: %v)\n", + tsResult.Points[0].Value, + time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) + fmt.Printf("最新数据点: %.2f (时间: %v)\n", + tsResult.Points[len(tsResult.Points)-1].Value, + time.Unix(0, tsResult.Points[len(tsResult.Points)-1].Timestamp).Format(time.RFC3339)) + } + } +} + +// 演示聚合查询性能对比 +func demoAggregationComparison(memEng, fileEng engine.Engine) { + // 准备测试数据 + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 1000; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor003", + MetricCode: "cpu_usage", + Labels: map[string]string{ + "host": "server1", + }, + Value: float64(30 + (i % 40)), + Timestamp: now.Add(time.Duration(-i) * time.Minute).UnixNano(), + }) + } + + // 写入两个引擎 + if err := memEng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write to memory engine: %v", err) + return + } + + if err := fileEng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write to file engine: %v", err) + return + } + + // 创建聚合查询 + query := engine.NewQueryBuilder(). + ForMetric("cpu_usage"). + WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). + WithTag("host", engine.OpEqual, "server1"). + WithAggregation(engine.AggAvg, 1*time.Hour). + Build() + + // 测试内存引擎聚合性能 + memStart := time.Now() + memResult, err := memEng.Query(context.Background(), query) + if err != nil { + log.Printf("Memory engine aggregation failed: %v", err) + return + } + memDuration := time.Since(memStart) + + // 测试文件引擎聚合性能 + fileStart := time.Now() + fileResult, err := fileEng.Query(context.Background(), query) + if err != nil { + log.Printf("File engine aggregation failed: %v", err) + return + } + fileDuration := time.Since(fileStart) + + // 打印性能对比 + fmt.Printf("内存引擎聚合查询耗时: %v\n", memDuration) + fmt.Printf("文件引擎聚合查询耗时: %v\n", fileDuration) + + if memAgg, ok := memResult.(*engine.AggregateResult); ok { + fmt.Printf("内存引擎聚合组数: %d\n", len(memAgg.Groups)) + } + if fileAgg, ok := fileResult.(*engine.AggregateResult); ok { + fmt.Printf("文件引擎聚合组数: %d\n", len(fileAgg.Groups)) + } +} + +// 打印引擎统计信息 +func printEngineStats(name string, eng engine.Engine) { + stats := eng.Stats() + caps := eng.Capabilities() + + fmt.Printf("\n=== %s 统计信息 ===\n", name) + fmt.Printf("数据点总数: %d\n", stats.PointsCount) + fmt.Printf("最后写入时间: %v\n", stats.LastWriteTime.Format(time.RFC3339)) + fmt.Printf("支持压缩: %v\n", caps.SupportsCompression) + fmt.Printf("支持持久化: %v\n", caps.SupportsPersistence) + fmt.Printf("支持复制: %v\n", caps.SupportsReplication) + fmt.Printf("最大并发写入: %d\n", caps.MaxConcurrentWrites) +} diff --git a/examples/file_engine_example.go b/examples/file_engine_example.go new file mode 100644 index 0000000..3f95a3c --- /dev/null +++ b/examples/file_engine_example.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + _ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎以注册 +) + +func main() { + // 创建临时目录 + tempDir, err := os.MkdirTemp("", "gotidb_example") + if err != nil { + fmt.Printf("Failed to create temp dir: %v\n", err) + return + } + defer os.RemoveAll(tempDir) + + // 创建引擎配置 + config := &engine.FileEngineConfig{ + DataDir: tempDir, + SegmentSize: 1024 * 1024, // 1MB + MaxSegments: 10, + WriteBufferSize: 1000, + IndexCacheSize: 1024 * 1024, // 1MB + UseCompression: false, + CompressionLevel: 0, + CompactThreshold: 0.7, + MaxOpenFiles: 100, + SyncWrites: true, + RetentionPeriod: 24 * time.Hour, + } + + // 创建引擎 + e, err := engine.NewEngine(engine.EngineConfig{ + Type: "file", + FileConfig: config, + }) + if err != nil { + fmt.Printf("Failed to create file engine: %v\n", err) + return + } + + // 打开引擎 + if err := e.Open(); err != nil { + fmt.Printf("Failed to open engine: %v\n", err) + return + } + defer e.Close() + + // 创建上下文 + ctx := context.Background() + + // 写入测试数据 + fmt.Println("Writing data points...") + points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 1.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + "app": "web", + }, + }, + { + Timestamp: time.Now().Add(time.Second).UnixNano(), + Value: 2.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + "app": "web", + }, + }, + { + Timestamp: time.Now().Add(2 * time.Second).UnixNano(), + Value: 3.0, + Labels: map[string]string{ + "host": "server2", + "region": "us-east", + "app": "api", + }, + }, + { + Timestamp: time.Now().Add(3 * time.Second).UnixNano(), + Value: 4.0, + Labels: map[string]string{ + "host": "server2", + "region": "us-east", + "app": "api", + }, + }, + } + + // 写入数据 + if err := e.Write(ctx, points); err != nil { + fmt.Printf("Failed to write points: %v\n", err) + return + } + + // 查询原始数据 + fmt.Println("\nQuerying raw data for server1...") + rawQuery := engine.Query{ + Type: engine.QueryTypeRaw, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + Limit: 10, + } + + rawResult, err := e.Query(ctx, rawQuery) + if err != nil { + fmt.Printf("Failed to query raw data: %v\n", err) + return + } + + // 打印原始查询结果 + fmt.Printf("Raw query returned %d series\n", len(rawResult)) + for i, series := range rawResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + fmt.Printf(" Points: %d\n", len(series.Points)) + for j, point := range series.Points { + fmt.Printf(" Point %d: timestamp=%s, value=%f\n", + j+1, + time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), + point.Value) + } + } + + // 查询最新数据 + fmt.Println("\nQuerying latest data for each host...") + latestQuery := engine.Query{ + Type: engine.QueryTypeLatest, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{}, // 空标签查询所有序列 + } + + latestResult, err := e.Query(ctx, latestQuery) + if err != nil { + fmt.Printf("Failed to query latest data: %v\n", err) + return + } + + // 打印最新查询结果 + fmt.Printf("Latest query returned %d series\n", len(latestResult)) + for i, series := range latestResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + for _, point := range series.Points { + fmt.Printf(" Latest point: timestamp=%s, value=%f\n", + time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), + point.Value) + } + } + + // 查询聚合数据 + fmt.Println("\nQuerying aggregate data (average) for each region...") + aggQuery := engine.Query{ + Type: engine.QueryTypeAggregate, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + AggregateType: engine.AggregateTypeAvg, + Tags: map[string]string{}, // 空标签查询所有序列 + } + + aggResult, err := e.Query(ctx, aggQuery) + if err != nil { + fmt.Printf("Failed to query aggregate data: %v\n", err) + return + } + + // 打印聚合查询结果 + fmt.Printf("Aggregate query returned %d series\n", len(aggResult)) + for i, series := range aggResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + for _, point := range series.Points { + fmt.Printf(" Average value: %f\n", point.Value) + } + } + + // 获取引擎统计信息 + stats := e.Stats() + fmt.Println("\nEngine statistics:") + fmt.Printf(" Points count: %d\n", stats.PointsCount) + fmt.Printf(" Segments count: %d\n", stats.SegmentsCount) + fmt.Printf(" Last write time: %s\n", stats.LastWriteTime.Format(time.RFC3339)) + if !stats.LastCompactionTime.IsZero() { + fmt.Printf(" Last compaction time: %s\n", stats.LastCompactionTime.Format(time.RFC3339)) + } else { + fmt.Printf(" Last compaction time: Never\n") + } +} diff --git a/go.mod b/go.mod index eee35cb..8168b38 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/stretchr/testify v1.10.0 ) -require go.etcd.io/bbolt v1.4.1 // indirect +require go.etcd.io/bbolt v1.4.1 require ( github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 5df72a2..50e4cf9 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= @@ -128,15 +127,11 @@ golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go index 0663217..ebe3cea 100644 --- a/pkg/storage/boltdb.go +++ b/pkg/storage/boltdb.go @@ -15,7 +15,7 @@ import ( const ( // PersistenceTypeBoltDB BoltDB持久化类型 - PersistenceTypeBoltDB PersistenceType = "boltdb" + // PersistenceTypeBoltDB PersistenceType = "boltdb" // 默认bucket名称 devicesBucketName = "devices" @@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod // ReadDuration 读取指定时间范围内的数据 func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { - deviceKey := id.String() + // deviceKey := id.String() // 从数据库读取所有数据 values, err := e.Read(ctx, id)