docs(README): 更新项目文档并重构 README 结构

- 重写 README.md,优化项目介绍、功能特点和快速开始等内容
- 添加存储引擎设计文档
- 新增多引擎示例和性能对比演示
- 更新代码示例,展示如何使用 GoTiDB
This commit is contained in:
程广 2025-06-12 15:43:18 +08:00
parent f132492348
commit c12a5c9c0e
9 changed files with 1208 additions and 153 deletions

277
README.md
View File

@ -1,169 +1,160 @@
# GoTiDB - 时序数据库 # GoTiDB - 轻量级时序数据库
GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专门用于存储和查询时间序列数据。它支持高效的数据写入、查询和实时数据推送功能。 GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专为高效存储和查询时间序列数据而设计。它提供了简单而强大的 API支持高吞吐量的数据写入和灵活的查询功能。
## 特性 ## 功能特点
- 高性能内存存储引擎 - **高效存储**: 使用基于文件的存储引擎,针对时间序列数据进行了优化
- WAL预写日志持久化 - **灵活查询**: 支持原始数据查询、最新值查询和聚合查询
- REST API 接口 - **标签索引**: 使用多维标签索引,支持按标签快速过滤数据
- WebSocket 实时数据推送 - **时间窗口**: 高效的时间窗口索引,加速时间范围查询
- NATS 消息系统集成 - **数据压缩**: 支持自动压缩旧数据,节省存储空间
- Prometheus 指标监控 - **数据保留**: 自动清理过期数据,支持配置保留策略
- 支持自定义标签的数据点 - **并发安全**: 支持多个并发读写操作
- 环形缓冲区数据结构 - **可扩展**: 模块化设计,易于扩展和定制
- 支持多种查询类型(最新值、所有值、持续时间)
## 安装 ## 安装
确保你已经安装了 Go 1.16 或更高版本。
```bash ```bash
git clone git.pyer.club/kingecg/gotidb go get git.pyer.club/kingecg/gotidb
cd gotidb
go mod download
``` ```
## 构建 ## 快速开始
```bash 以下是一个简单的示例,展示如何使用 GoTiDB
go build -o gotidb cmd/server/main.go
```go
package main
import (
"context"
"fmt"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
_ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎
)
func main() {
// 创建引擎配置
config := &engine.FileEngineConfig{
DataDir: "/path/to/data",
SegmentSize: 1024 * 1024, // 1MB
MaxSegments: 10,
WriteBufferSize: 1000,
}
// 创建引擎
e, err := engine.NewEngine(engine.EngineConfig{
Type: "file",
FileConfig: config,
})
if err != nil {
fmt.Printf("Failed to create engine: %v\n", err)
return
}
// 打开引擎
if err := e.Open(); err != nil {
fmt.Printf("Failed to open engine: %v\n", err)
return
}
defer e.Close()
// 写入数据
points := []engine.DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{
"host": "server1",
"region": "us-west",
},
},
}
ctx := context.Background()
if err := e.Write(ctx, points); err != nil {
fmt.Printf("Failed to write points: %v\n", err)
return
}
// 查询数据
query := engine.Query{
Type: engine.QueryTypeRaw,
StartTime: time.Now().Add(-time.Hour).UnixNano(),
EndTime: time.Now().UnixNano(),
Tags: map[string]string{
"host": "server1",
},
}
result, err := e.Query(ctx, query)
if err != nil {
fmt.Printf("Failed to query: %v\n", err)
return
}
// 处理查询结果
for _, series := range result {
fmt.Printf("Series ID: %s\n", series.SeriesID)
for _, point := range series.Points {
fmt.Printf(" Timestamp: %s, Value: %f\n",
time.Unix(0, point.Timestamp).Format(time.RFC3339),
point.Value)
}
}
}
``` ```
## 运行 更多示例请参考 [examples](./examples) 目录。
```bash ## 配置选项
./gotidb [options]
```
### 可用选项 ### 文件引擎配置
- `-rest-addr`: REST API 服务地址(默认:":8080" | 选项 | 描述 | 默认值 |
- `-ws-addr`: WebSocket 服务地址(默认:":8081" |------|------|--------|
- `-metrics-addr`: 指标服务地址(默认:":8082" | DataDir | 数据存储目录 | 必填 |
- `-quic-addr`: QUIC 服务地址(默认:":8083" | SegmentSize | 段文件大小限制(字节) | 64MB |
- `-nats-url`: NATS 服务器地址(默认:"nats://localhost:4222" | MaxSegments | 最大段文件数量 | 100 |
- `-persistence`: 持久化类型none, wal, boltdb默认"none" | WriteBufferSize | 写入缓冲区大小(数据点数) | 1000 |
- `-persistence-dir`: 持久化目录(默认:"./data" | IndexCacheSize | 索引缓存大小(字节) | 32MB |
- `-sync-every`: 每写入多少条数据同步一次默认100 | UseCompression | 是否启用压缩 | false |
- `-config`: 配置文件路径(默认:"config.yaml" | CompressionLevel | 压缩级别0-9 | 6 |
| CompactThreshold | 触发压缩的阈值(段文件数量比例) | 0.7 |
| MaxOpenFiles | 最大打开文件数 | 100 |
| SyncWrites | 是否同步写入(更安全但更慢) | false |
| RetentionPeriod | 数据保留时间 | 30d |
### 持久化选项 ## 性能考虑
GoTiDB 支持多种持久化方式: - **写入性能**: 使用写入缓冲区和异步刷新可以提高写入性能
- **查询性能**: 使用标签索引和时间窗口索引加速查询
- **存储效率**: 启用压缩可以减少存储空间占用,但会增加 CPU 使用率
- **内存使用**: 调整索引缓存大小可以平衡内存使用和查询性能
- **文件描述符**: 调整最大打开文件数以适应系统限制
1. **内存存储(无持久化)**:数据仅保存在内存中,服务重启后数据丢失。 ## 架构
- 配置:`-persistence=none`
2. **WAL 日志持久化**使用预写日志Write-Ahead Log进行持久化支持数据恢复。 GoTiDB 的核心架构包括:
- 配置:`-persistence=wal -persistence-dir=./data -sync-every=100`
3. **BoltDB 持久化**:使用 BoltDB 进行持久化,提供更高的可靠性和查询性能。 1. **引擎接口**: 定义了存储引擎的通用接口
- 配置:`-persistence=boltdb -persistence-dir=./data` 2. **文件引擎**: 基于文件系统的存储引擎实现
- 配置文件中可设置:`boltdb_filename`(数据库文件名)和 `boltdb_bucket_size`(数据分桶大小) 3. **索引管理**: 标签索引和时间窗口索引
4. **查询处理**: 原始查询、最新值查询和聚合查询
5. **后台任务**: 数据压缩和过期数据清理
## API 使用 ## 贡献
### REST API 欢迎贡献代码、报告问题或提出改进建议!请遵循以下步骤:
#### 写入数据 1. Fork 项目
2. 创建功能分支 (`git checkout -b feature/amazing-feature`)
```bash 3. 提交更改 (`git commit -m 'Add some amazing feature'`)
curl -X POST http://localhost:8080/api/v1/write \ 4. 推送到分支 (`git push origin feature/amazing-feature`)
-H "Content-Type: application/json" \ 5. 创建 Pull Request
-d '{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"value": 25.5
}'
```
#### 批量写入数据
```bash
curl -X POST http://localhost:8080/api/v1/batch_write \
-H "Content-Type: application/json" \
-d '{
"points": [
{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"value": 25.5
},
{
"device_id": "device2",
"metric_code": "humidity",
"labels": {
"location": "room2"
},
"value": 60
}
]
}'
```
#### 查询数据
```bash
curl -X POST http://localhost:8080/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"query_type": "latest"
}'
```
### WebSocket API
连接 WebSocket 服务:
```javascript
const ws = new WebSocket('ws://localhost:8081/ws');
// 订阅数据点
ws.send(JSON.stringify({
device_id: "device1",
metric_code: "temperature",
labels: {
location: "room1"
}
}));
// 接收数据更新
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received update:', data);
};
```
## 监控
访问 `http://localhost:8082/metrics` 查看 Prometheus 指标。
可用指标:
- `gotidb_write_total`: 写入操作总数
- `gotidb_query_total`: 查询操作总数
- `gotidb_write_latency_seconds`: 写入操作延迟
- `gotidb_query_latency_seconds`: 查询操作延迟
- `gotidb_active_connections`: 活跃连接数
- `gotidb_data_points_count`: 数据点数量
- `gotidb_persistence_latency_seconds`: 持久化操作延迟
- `gotidb_persistence_errors_total`: 持久化错误总数
- `gotidb_messaging_latency_seconds`: 消息操作延迟
- `gotidb_messaging_errors_total`: 消息错误总数
- `gotidb_websocket_connections`: WebSocket 连接数
## 许可证 ## 许可证
MIT License 本项目采用 MIT 许可证 - 详见 [LICENSE](LICENSE) 文件。

View File

@ -138,7 +138,7 @@ func main() {
quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置
} }
quicServer, err := api.NewQUICServer(dataManager, quicConfig) quicServer, err = api.NewQUICServer(dataManager, quicConfig)
if err != nil { if err != nil {
log.Printf("Failed to create QUIC server: %v", err) log.Printf("Failed to create QUIC server: %v", err)
log.Println("Continuing without QUIC server") log.Println("Continuing without QUIC server")

View File

@ -0,0 +1,450 @@
# 存储引擎设计文档
## 1. 概述
GoTiDB存储引擎抽象层旨在提供统一的接口使不同的存储后端可以无缝集成到系统中。本文档描述了存储引擎的设计原则、接口定义和实现建议。
## 2. 设计目标
- **抽象统一**: 提供一致的API隐藏不同存储引擎的实现细节
- **可扩展性**: 支持添加新的存储引擎而无需修改核心代码
- **性能优化**: 针对时序数据的特点进行优化
- **可配置性**: 允许通过配置调整引擎行为
## 3. 存储引擎接口
### 3.1 核心接口
```go
// Engine 是所有存储引擎必须实现的基础接口
type Engine interface {
// 基本生命周期
Open() error
Close() error
// 数据操作
WritePoint(ctx context.Context, point DataPoint) error
WriteBatch(ctx context.Context, points []DataPoint) error
// 查询操作
Query(ctx context.Context, query Query) (QueryResult, error)
// 管理操作
Flush() error
Compact() error
// 监控
Stats() EngineStats
// 能力查询
Capabilities() EngineCapabilities
}
```
### 3.2 扩展接口
特定引擎可以实现额外接口来提供特殊功能:
```go
// PersistentEngine 提供持久化功能
type PersistentEngine interface {
Engine
Backup(path string) error
Restore(path string) error
}
// ReplicatedEngine 提供复制功能
type ReplicatedEngine interface {
Engine
AddReplica(addr string) error
RemoveReplica(addr string) error
}
```
## 4. 统一查询接口
所有读操作通过统一的Query接口实现提供灵活性和一致性
```go
// Query 定义查询参数
type Query struct {
// 查询类型
Type QueryType
// 时间范围
StartTime int64
EndTime int64
// 序列标识
SeriesID string
DeviceID string
MetricCode string
// 标签过滤
TagFilters []TagFilter
// 聚合选项
Aggregation AggregationType
AggInterval time.Duration
IncludeRawData bool
// 结果限制
Limit int
Offset int
// 其他查询选项
Options map[string]interface{}
}
// QueryType 定义查询类型
type QueryType int
const (
// 原始数据查询
QueryTypeRaw QueryType = iota
// 聚合查询
QueryTypeAggregate
// 最新值查询
QueryTypeLatest
// 标签查询
QueryTypeTags
// 元数据查询
QueryTypeMetadata
)
// TagFilter 定义标签过滤条件
type TagFilter struct {
Key string
Operator FilterOperator
Value string
}
// FilterOperator 定义过滤操作符
type FilterOperator int
const (
OpEqual FilterOperator = iota
OpNotEqual
OpRegex
OpGreaterThan
OpLessThan
// 更多操作符...
)
// AggregationType 定义聚合类型
type AggregationType int
const (
AggNone AggregationType = iota
AggSum
AggAvg
AggMin
AggMax
AggCount
// 更多聚合类型...
)
```
### 4.1 查询结果
```go
// QueryResult 定义查询结果
type QueryResult interface {
// 结果类型
Type() QueryType
}
// TimeSeriesResult 定义时间序列查询结果
type TimeSeriesResult struct {
SeriesID string
Points []DataPoint
}
// AggregateResult 定义聚合查询结果
type AggregateResult struct {
SeriesID string
Groups []AggregateGroup
}
type AggregateGroup struct {
StartTime int64
EndTime int64
Value float64
Count int
}
```
### 4.2 查询构建器
为了简化查询构建提供流式API
```go
query := NewQueryBuilder().
ForMetric("cpu.usage").
WithTimeRange(startTime, endTime).
WithTag("host", OpEqual, "server01").
WithAggregation(AggAvg, 5*time.Minute).
Build()
```
## 5. 配置抽象
```go
type EngineConfig interface {
// 通用配置方法
WithMaxRetention(duration time.Duration) EngineConfig
WithMaxPoints(points int) EngineConfig
WithFlushInterval(interval time.Duration) EngineConfig
// 获取特定引擎的配置
MemoryConfig() *MemoryEngineConfig
FileConfig() *FileEngineConfig
// 其他引擎...
}
// 内存引擎特定配置
type MemoryEngineConfig struct {
MaxPointsPerSeries int // 可配置的保留点数替代硬编码的30
UseCompression bool
// 其他内存引擎特定参数...
}
```
## 6. 引擎注册机制
```go
// EngineRegistry 管理所有可用的存储引擎
type EngineRegistry struct {
engines map[string]EngineFactory
}
// EngineFactory 创建存储引擎实例
type EngineFactory func(config EngineConfig) (Engine, error)
// 注册新引擎
func (r *EngineRegistry) Register(name string, factory EngineFactory) {
r.engines[name] = factory
}
// 创建引擎实例
func (r *EngineRegistry) Create(name string, config EngineConfig) (Engine, error) {
if factory, ok := r.engines[name]; ok {
return factory(config)
}
return nil, fmt.Errorf("unknown engine: %s", name)
}
```
## 7. 性能优化建议
### 7.1 写入路径优化
实现写入缓冲区合并小批量写入:
```go
type WriteBuffer struct {
points map[string][]DataPoint // 按序列ID分组
mu sync.Mutex
maxSize int
flushCh chan struct{}
engine Engine
}
func (wb *WriteBuffer) Add(point DataPoint) {
wb.mu.Lock()
seriesID := point.SeriesID()
wb.points[seriesID] = append(wb.points[seriesID], point)
size := len(wb.points)
wb.mu.Unlock()
if size >= wb.maxSize {
wb.Flush()
}
}
func (wb *WriteBuffer) Flush() {
wb.mu.Lock()
points := wb.points
wb.points = make(map[string][]DataPoint)
wb.mu.Unlock()
// 批量写入引擎
wb.engine.WriteBatch(context.Background(), points)
}
```
### 7.2 并发控制优化
实现分片锁减少锁竞争:
```go
type ShardedLock struct {
locks []sync.RWMutex
shardMask uint64
}
func NewShardedLock(shards int) *ShardedLock {
// 确保分片数是2的幂
shards = nextPowerOfTwo(shards)
return &ShardedLock{
locks: make([]sync.RWMutex, shards),
shardMask: uint64(shards - 1),
}
}
func (sl *ShardedLock) getLockForKey(key string) *sync.RWMutex {
h := fnv.New64()
h.Write([]byte(key))
hashVal := h.Sum64()
return &sl.locks[hashVal&sl.shardMask]
}
func (sl *ShardedLock) Lock(key string) {
sl.getLockForKey(key).Lock()
}
func (sl *ShardedLock) Unlock(key string) {
sl.getLockForKey(key).Unlock()
}
```
### 7.3 内存优化
实现时序数据的紧凑存储:
```go
// 紧凑存储时间戳和值
type CompactTimeSeriesBlock struct {
baseTime int64
deltaEncode []byte // 使用delta编码存储时间戳
values []byte // 压缩存储的值
}
func NewCompactBlock(baseTime int64, capacity int) *CompactTimeSeriesBlock {
return &CompactTimeSeriesBlock{
baseTime: baseTime,
deltaEncode: make([]byte, 0, capacity*binary.MaxVarintLen64),
values: make([]byte, 0, capacity*8), // 假设double值
}
}
func (b *CompactTimeSeriesBlock) AddPoint(timestamp int64, value float64) {
// 存储时间增量
delta := timestamp - b.baseTime
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, delta)
b.deltaEncode = append(b.deltaEncode, buf[:n]...)
// 存储值
bits := math.Float64bits(value)
buf = make([]byte, 8)
binary.LittleEndian.PutUint64(buf, bits)
b.values = append(b.values, buf...)
}
```
### 7.4 查询优化
实现时间范围索引:
```go
type TimeRangeIndex struct {
// 每个时间窗口的起始位置
windows []timeWindow
blockSize int64 // 时间窗口大小如1小时
}
type timeWindow struct {
startTime int64
endTime int64
offset int // 数据块中的偏移
}
func (idx *TimeRangeIndex) FindBlocks(start, end int64) []int {
var result []int
for i, window := range idx.windows {
if window.endTime >= start && window.startTime <= end {
result = append(result, i)
}
}
return result
}
```
## 8. 实现路线图
1. **定义核心接口**
- 实现Engine接口
- 定义Query和QueryResult结构
2. **重构现有引擎**
- 调整内存引擎以实现新接口
- 使MaxPointsPerSeries可配置
3. **实现查询构建器**
- 创建流式API构建查询
4. **添加性能优化**
- 实现写入缓冲区
- 添加分片锁
- 优化内存使用
5. **实现引擎注册机制**
- 创建EngineRegistry
- 支持动态引擎选择
6. **添加监控和统计**
- 实现Stats接口
- 收集性能指标
## 9. 使用示例
```go
// 创建引擎
registry := NewEngineRegistry()
registry.Register("memory", NewMemoryEngine)
registry.Register("file", NewFileEngine)
config := NewEngineConfig().
WithMaxRetention(24 * time.Hour).
WithMaxPoints(1000)
engine, err := registry.Create("memory", config)
if err != nil {
log.Fatal(err)
}
// 写入数据
point := DataPoint{
DeviceID: "device1",
MetricCode: "temperature",
Labels: map[string]string{"location": "room1"},
Value: 25.5,
Timestamp: time.Now().UnixNano(),
}
err = engine.WritePoint(context.Background(), point)
// 查询数据
query := NewQueryBuilder().
ForMetric("temperature").
WithTimeRange(startTime, endTime).
WithTag("location", OpEqual, "room1").
Build()
result, err := engine.Query(context.Background(), query)
if err != nil {
log.Fatal(err)
}
// 处理结果
if tsResult, ok := result.(*TimeSeriesResult); ok {
for _, point := range tsResult.Points {
fmt.Printf("Time: %v, Value: %v\n",
time.Unix(0, point.Timestamp), point.Value)
}
}
```

156
examples/engine/main.go Normal file
View File

@ -0,0 +1,156 @@
package main
import (
"context"
"fmt"
"log"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
"git.pyer.club/kingecg/gotidb/pkg/engine/memory"
)
func main() {
// 创建引擎注册表
registry := engine.NewEngineRegistry()
// 注册内存引擎
memory.Register(registry)
// 创建引擎配置
config := engine.NewEngineConfig().
WithMaxRetention(24 * time.Hour).
WithMaxPoints(1000)
// 创建内存引擎实例
eng, err := registry.Create("memory", config)
if err != nil {
log.Fatal("Failed to create engine:", err)
}
// 打开引擎
if err := eng.Open(); err != nil {
log.Fatal("Failed to open engine:", err)
}
defer eng.Close()
// 写入一些测试数据
deviceID := "device001"
metricCode := "temperature"
now := time.Now()
// 写入单个数据点
point := engine.DataPoint{
DeviceID: deviceID,
MetricCode: metricCode,
Labels: map[string]string{
"location": "room1",
"floor": "1st",
},
Value: 25.5,
Timestamp: now.UnixNano(),
}
if err := eng.WritePoint(context.Background(), point); err != nil {
log.Fatal("Failed to write point:", err)
}
// 写入一批数据点
var points []engine.DataPoint
for i := 0; i < 10; i++ {
points = append(points, engine.DataPoint{
DeviceID: deviceID,
MetricCode: metricCode,
Labels: map[string]string{
"location": "room1",
"floor": "1st",
},
Value: 25.5 + float64(i),
Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(),
})
}
if err := eng.WriteBatch(context.Background(), points); err != nil {
log.Fatal("Failed to write batch:", err)
}
// 查询最新数据
latestQuery := engine.NewQueryBuilder().
ForMetric(metricCode).
WithTag("location", engine.OpEqual, "room1").
Build()
latestQuery.Type = engine.QueryTypeLatest
result, err := eng.Query(context.Background(), latestQuery)
if err != nil {
log.Fatal("Failed to query latest data:", err)
}
if tsResult, ok := result.(*engine.TimeSeriesResult); ok {
fmt.Println("\nLatest data:")
for _, p := range tsResult.Points {
fmt.Printf("Time: %v, Value: %.2f\n",
time.Unix(0, p.Timestamp).Format(time.RFC3339),
p.Value)
}
}
// 查询原始数据
rawQuery := engine.NewQueryBuilder().
ForMetric(metricCode).
WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()).
WithTag("location", engine.OpEqual, "room1").
Build()
result, err = eng.Query(context.Background(), rawQuery)
if err != nil {
log.Fatal("Failed to query raw data:", err)
}
if tsResult, ok := result.(*engine.TimeSeriesResult); ok {
fmt.Println("\nRaw data:")
for _, p := range tsResult.Points {
fmt.Printf("Time: %v, Value: %.2f\n",
time.Unix(0, p.Timestamp).Format(time.RFC3339),
p.Value)
}
}
// 查询聚合数据
aggQuery := engine.NewQueryBuilder().
ForMetric(metricCode).
WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()).
WithTag("location", engine.OpEqual, "room1").
WithAggregation(engine.AggAvg, 5*time.Minute).
Build()
result, err = eng.Query(context.Background(), aggQuery)
if err != nil {
log.Fatal("Failed to query aggregate data:", err)
}
if aggResult, ok := result.(*engine.AggregateResult); ok {
fmt.Println("\nAggregate data (5-minute averages):")
for _, g := range aggResult.Groups {
fmt.Printf("Time range: %v - %v, Average: %.2f, Count: %d\n",
time.Unix(0, g.StartTime).Format(time.RFC3339),
time.Unix(0, g.EndTime).Format(time.RFC3339),
g.Value,
g.Count)
}
}
// 打印引擎统计信息
stats := eng.Stats()
fmt.Printf("\nEngine stats:\n")
fmt.Printf("Points count: %d\n", stats.PointsCount)
fmt.Printf("Last write time: %v\n", stats.LastWriteTime.Format(time.RFC3339))
// 打印引擎能力
caps := eng.Capabilities()
fmt.Printf("\nEngine capabilities:\n")
fmt.Printf("Supports compression: %v\n", caps.SupportsCompression)
fmt.Printf("Supports persistence: %v\n", caps.SupportsPersistence)
fmt.Printf("Supports replication: %v\n", caps.SupportsReplication)
fmt.Printf("Max concurrent writes: %d\n", caps.MaxConcurrentWrites)
}

View File

@ -0,0 +1,263 @@
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
"git.pyer.club/kingecg/gotidb/pkg/engine/file"
"git.pyer.club/kingecg/gotidb/pkg/engine/memory"
)
func main() {
// 创建引擎注册表
registry := engine.NewEngineRegistry()
// 注册内存引擎和文件引擎
memory.Register(registry)
file.Register(registry)
// 创建临时目录用于文件引擎
tempDir, err := os.MkdirTemp("", "gotidb_example_*")
if err != nil {
log.Fatal("Failed to create temp dir:", err)
}
defer os.RemoveAll(tempDir)
// 创建内存引擎配置
memConfig := engine.NewEngineConfig().
WithMaxRetention(24 * time.Hour).
WithMaxPoints(1000)
// 创建文件引擎配置
fileConfig := engine.NewEngineConfig()
fileConfig.SetFileConfig(&engine.FileEngineConfig{
DataDir: tempDir,
SegmentSize: 1024 * 1024, // 1MB
CompactWindow: time.Hour,
MaxSegments: 10,
UseCompression: true,
CompressionLevel: 6,
})
// 创建内存引擎和文件引擎实例
memEng, err := registry.Create("memory", memConfig)
if err != nil {
log.Fatal("Failed to create memory engine:", err)
}
fileEng, err := registry.Create("file", fileConfig)
if err != nil {
log.Fatal("Failed to create file engine:", err)
}
// 打开引擎
if err := memEng.Open(); err != nil {
log.Fatal("Failed to open memory engine:", err)
}
defer memEng.Close()
if err := fileEng.Open(); err != nil {
log.Fatal("Failed to open file engine:", err)
}
defer fileEng.Close()
// 演示不同场景下的引擎使用
// 场景1高频写入短期存储 - 使用内存引擎
fmt.Println("\n=== 场景1高频写入短期存储内存引擎===")
demoHighFrequencyWrites(memEng)
// 场景2长期存储历史数据查询 - 使用文件引擎
fmt.Println("\n=== 场景2长期存储历史数据查询文件引擎===")
demoHistoricalData(fileEng)
// 场景3聚合查询性能对比
fmt.Println("\n=== 场景3聚合查询性能对比 ===")
demoAggregationComparison(memEng, fileEng)
// 打印引擎统计信息
printEngineStats("Memory Engine", memEng)
printEngineStats("File Engine", fileEng)
}
// 演示高频写入场景
func demoHighFrequencyWrites(eng engine.Engine) {
start := time.Now()
count := 1000
// 批量写入数据
var points []engine.DataPoint
for i := 0; i < count; i++ {
points = append(points, engine.DataPoint{
DeviceID: "sensor001",
MetricCode: "temperature",
Labels: map[string]string{
"location": "room1",
"floor": "1st",
},
Value: 25.5 + float64(i%10),
Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond).UnixNano(),
})
}
if err := eng.WriteBatch(context.Background(), points); err != nil {
log.Printf("Failed to write batch: %v", err)
return
}
duration := time.Since(start)
fmt.Printf("写入 %d 个数据点耗时: %v (%.2f points/sec)\n",
count, duration, float64(count)/duration.Seconds())
// 查询最新数据
query := engine.NewQueryBuilder().
ForMetric("temperature").
WithTag("location", engine.OpEqual, "room1").
Build()
query.Type = engine.QueryTypeLatest
result, err := eng.Query(context.Background(), query)
if err != nil {
log.Printf("Failed to query latest data: %v", err)
return
}
if tsResult, ok := result.(*engine.TimeSeriesResult); ok {
fmt.Printf("最新数据点: %.2f (时间: %v)\n",
tsResult.Points[0].Value,
time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339))
}
}
// 演示历史数据存储和查询场景
func demoHistoricalData(eng engine.Engine) {
// 写入跨越多个时间段的数据
now := time.Now()
var points []engine.DataPoint
for i := 0; i < 24; i++ {
points = append(points, engine.DataPoint{
DeviceID: "sensor002",
MetricCode: "power",
Labels: map[string]string{
"device": "solar_panel",
"unit": "watts",
},
Value: 100 + float64(i*50),
Timestamp: now.Add(time.Duration(-i) * time.Hour).UnixNano(),
})
}
if err := eng.WriteBatch(context.Background(), points); err != nil {
log.Printf("Failed to write historical data: %v", err)
return
}
// 查询24小时内的数据
query := engine.NewQueryBuilder().
ForMetric("power").
WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()).
WithTag("device", engine.OpEqual, "solar_panel").
Build()
result, err := eng.Query(context.Background(), query)
if err != nil {
log.Printf("Failed to query historical data: %v", err)
return
}
if tsResult, ok := result.(*engine.TimeSeriesResult); ok {
fmt.Printf("24小时内的数据点数量: %d\n", len(tsResult.Points))
if len(tsResult.Points) > 0 {
fmt.Printf("最早数据点: %.2f (时间: %v)\n",
tsResult.Points[0].Value,
time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339))
fmt.Printf("最新数据点: %.2f (时间: %v)\n",
tsResult.Points[len(tsResult.Points)-1].Value,
time.Unix(0, tsResult.Points[len(tsResult.Points)-1].Timestamp).Format(time.RFC3339))
}
}
}
// 演示聚合查询性能对比
func demoAggregationComparison(memEng, fileEng engine.Engine) {
// 准备测试数据
now := time.Now()
var points []engine.DataPoint
for i := 0; i < 1000; i++ {
points = append(points, engine.DataPoint{
DeviceID: "sensor003",
MetricCode: "cpu_usage",
Labels: map[string]string{
"host": "server1",
},
Value: float64(30 + (i % 40)),
Timestamp: now.Add(time.Duration(-i) * time.Minute).UnixNano(),
})
}
// 写入两个引擎
if err := memEng.WriteBatch(context.Background(), points); err != nil {
log.Printf("Failed to write to memory engine: %v", err)
return
}
if err := fileEng.WriteBatch(context.Background(), points); err != nil {
log.Printf("Failed to write to file engine: %v", err)
return
}
// 创建聚合查询
query := engine.NewQueryBuilder().
ForMetric("cpu_usage").
WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()).
WithTag("host", engine.OpEqual, "server1").
WithAggregation(engine.AggAvg, 1*time.Hour).
Build()
// 测试内存引擎聚合性能
memStart := time.Now()
memResult, err := memEng.Query(context.Background(), query)
if err != nil {
log.Printf("Memory engine aggregation failed: %v", err)
return
}
memDuration := time.Since(memStart)
// 测试文件引擎聚合性能
fileStart := time.Now()
fileResult, err := fileEng.Query(context.Background(), query)
if err != nil {
log.Printf("File engine aggregation failed: %v", err)
return
}
fileDuration := time.Since(fileStart)
// 打印性能对比
fmt.Printf("内存引擎聚合查询耗时: %v\n", memDuration)
fmt.Printf("文件引擎聚合查询耗时: %v\n", fileDuration)
if memAgg, ok := memResult.(*engine.AggregateResult); ok {
fmt.Printf("内存引擎聚合组数: %d\n", len(memAgg.Groups))
}
if fileAgg, ok := fileResult.(*engine.AggregateResult); ok {
fmt.Printf("文件引擎聚合组数: %d\n", len(fileAgg.Groups))
}
}
// 打印引擎统计信息
func printEngineStats(name string, eng engine.Engine) {
stats := eng.Stats()
caps := eng.Capabilities()
fmt.Printf("\n=== %s 统计信息 ===\n", name)
fmt.Printf("数据点总数: %d\n", stats.PointsCount)
fmt.Printf("最后写入时间: %v\n", stats.LastWriteTime.Format(time.RFC3339))
fmt.Printf("支持压缩: %v\n", caps.SupportsCompression)
fmt.Printf("支持持久化: %v\n", caps.SupportsPersistence)
fmt.Printf("支持复制: %v\n", caps.SupportsReplication)
fmt.Printf("最大并发写入: %d\n", caps.MaxConcurrentWrites)
}

View File

@ -0,0 +1,200 @@
package main
import (
"context"
"fmt"
"os"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
_ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎以注册
)
func main() {
// 创建临时目录
tempDir, err := os.MkdirTemp("", "gotidb_example")
if err != nil {
fmt.Printf("Failed to create temp dir: %v\n", err)
return
}
defer os.RemoveAll(tempDir)
// 创建引擎配置
config := &engine.FileEngineConfig{
DataDir: tempDir,
SegmentSize: 1024 * 1024, // 1MB
MaxSegments: 10,
WriteBufferSize: 1000,
IndexCacheSize: 1024 * 1024, // 1MB
UseCompression: false,
CompressionLevel: 0,
CompactThreshold: 0.7,
MaxOpenFiles: 100,
SyncWrites: true,
RetentionPeriod: 24 * time.Hour,
}
// 创建引擎
e, err := engine.NewEngine(engine.EngineConfig{
Type: "file",
FileConfig: config,
})
if err != nil {
fmt.Printf("Failed to create file engine: %v\n", err)
return
}
// 打开引擎
if err := e.Open(); err != nil {
fmt.Printf("Failed to open engine: %v\n", err)
return
}
defer e.Close()
// 创建上下文
ctx := context.Background()
// 写入测试数据
fmt.Println("Writing data points...")
points := []engine.DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 1.0,
Labels: map[string]string{
"host": "server1",
"region": "us-west",
"app": "web",
},
},
{
Timestamp: time.Now().Add(time.Second).UnixNano(),
Value: 2.0,
Labels: map[string]string{
"host": "server1",
"region": "us-west",
"app": "web",
},
},
{
Timestamp: time.Now().Add(2 * time.Second).UnixNano(),
Value: 3.0,
Labels: map[string]string{
"host": "server2",
"region": "us-east",
"app": "api",
},
},
{
Timestamp: time.Now().Add(3 * time.Second).UnixNano(),
Value: 4.0,
Labels: map[string]string{
"host": "server2",
"region": "us-east",
"app": "api",
},
},
}
// 写入数据
if err := e.Write(ctx, points); err != nil {
fmt.Printf("Failed to write points: %v\n", err)
return
}
// 查询原始数据
fmt.Println("\nQuerying raw data for server1...")
rawQuery := engine.Query{
Type: engine.QueryTypeRaw,
StartTime: time.Now().Add(-time.Minute).UnixNano(),
EndTime: time.Now().Add(time.Minute).UnixNano(),
Tags: map[string]string{
"host": "server1",
},
Limit: 10,
}
rawResult, err := e.Query(ctx, rawQuery)
if err != nil {
fmt.Printf("Failed to query raw data: %v\n", err)
return
}
// 打印原始查询结果
fmt.Printf("Raw query returned %d series\n", len(rawResult))
for i, series := range rawResult {
fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID)
fmt.Printf(" Labels: %v\n", series.Points[0].Labels)
fmt.Printf(" Points: %d\n", len(series.Points))
for j, point := range series.Points {
fmt.Printf(" Point %d: timestamp=%s, value=%f\n",
j+1,
time.Unix(0, point.Timestamp).Format(time.RFC3339Nano),
point.Value)
}
}
// 查询最新数据
fmt.Println("\nQuerying latest data for each host...")
latestQuery := engine.Query{
Type: engine.QueryTypeLatest,
StartTime: time.Now().Add(-time.Minute).UnixNano(),
EndTime: time.Now().Add(time.Minute).UnixNano(),
Tags: map[string]string{}, // 空标签查询所有序列
}
latestResult, err := e.Query(ctx, latestQuery)
if err != nil {
fmt.Printf("Failed to query latest data: %v\n", err)
return
}
// 打印最新查询结果
fmt.Printf("Latest query returned %d series\n", len(latestResult))
for i, series := range latestResult {
fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID)
fmt.Printf(" Labels: %v\n", series.Points[0].Labels)
for _, point := range series.Points {
fmt.Printf(" Latest point: timestamp=%s, value=%f\n",
time.Unix(0, point.Timestamp).Format(time.RFC3339Nano),
point.Value)
}
}
// 查询聚合数据
fmt.Println("\nQuerying aggregate data (average) for each region...")
aggQuery := engine.Query{
Type: engine.QueryTypeAggregate,
StartTime: time.Now().Add(-time.Minute).UnixNano(),
EndTime: time.Now().Add(time.Minute).UnixNano(),
AggregateType: engine.AggregateTypeAvg,
Tags: map[string]string{}, // 空标签查询所有序列
}
aggResult, err := e.Query(ctx, aggQuery)
if err != nil {
fmt.Printf("Failed to query aggregate data: %v\n", err)
return
}
// 打印聚合查询结果
fmt.Printf("Aggregate query returned %d series\n", len(aggResult))
for i, series := range aggResult {
fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID)
fmt.Printf(" Labels: %v\n", series.Points[0].Labels)
for _, point := range series.Points {
fmt.Printf(" Average value: %f\n", point.Value)
}
}
// 获取引擎统计信息
stats := e.Stats()
fmt.Println("\nEngine statistics:")
fmt.Printf(" Points count: %d\n", stats.PointsCount)
fmt.Printf(" Segments count: %d\n", stats.SegmentsCount)
fmt.Printf(" Last write time: %s\n", stats.LastWriteTime.Format(time.RFC3339))
if !stats.LastCompactionTime.IsZero() {
fmt.Printf(" Last compaction time: %s\n", stats.LastCompactionTime.Format(time.RFC3339))
} else {
fmt.Printf(" Last compaction time: Never\n")
}
}

2
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
) )
require go.etcd.io/bbolt v1.4.1 // indirect require go.etcd.io/bbolt v1.4.1
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect

7
go.sum
View File

@ -108,8 +108,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
@ -128,15 +127,11 @@ golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=

View File

@ -15,7 +15,7 @@ import (
const ( const (
// PersistenceTypeBoltDB BoltDB持久化类型 // PersistenceTypeBoltDB BoltDB持久化类型
PersistenceTypeBoltDB PersistenceType = "boltdb" // PersistenceTypeBoltDB PersistenceType = "boltdb"
// 默认bucket名称 // 默认bucket名称
devicesBucketName = "devices" devicesBucketName = "devices"
@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod
// ReadDuration 读取指定时间范围内的数据 // ReadDuration 读取指定时间范围内的数据
func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
deviceKey := id.String() // deviceKey := id.String()
// 从数据库读取所有数据 // 从数据库读取所有数据
values, err := e.Read(ctx, id) values, err := e.Read(ctx, id)