package extension import ( "sync" "time" ) // ==================== 媒体类型定义 ==================== type TrackType int const ( TrackInvalid TrackType = iota - 1 TrackVideo TrackAudio TrackTitle TrackApplication ) func (t TrackType) String() string { switch t { case TrackVideo: return "video" case TrackAudio: return "audio" case TrackApplication: return "application" default: return "invalid" } } func GetTrackType(str string) TrackType { switch str { case "video": return TrackVideo case "audio": return TrackAudio case "application": return TrackApplication default: return TrackInvalid } } // ==================== 编解码标识 ==================== type CodecID int const ( CodecInvalid CodecID = iota - 1 CodecH264 CodecH265 CodecAAC CodecG711A CodecG711U CodecOpus CodecL16 CodecVP8 CodecVP9 CodecAV1 CodecJPEG CodecH266 CodecTS CodecPS CodecMP3 CodecADPCM CodecSVACV CodecSVACA CodecG722 CodecG723 CodecG728 CodecG729 ) var codecNames = map[CodecID]string{ CodecH264: "H264", CodecH265: "H265", CodecAAC: "mpeg4-generic", CodecG711A: "PCMA", CodecG711U: "PCMU", CodecOpus: "opus", CodecL16: "L16", CodecVP8: "VP8", CodecVP9: "VP9", CodecAV1: "AV1", CodecJPEG: "JPEG", CodecH266: "H266", CodecTS: "TS", CodecPS: "PS", CodecMP3: "MP3", CodecADPCM: "ADPCM", CodecSVACV: "SVACV", CodecSVACA: "SVACA", CodecG722: "G722", CodecG723: "G723", CodecG728: "G728", CodecG729: "G729", } var codecTypes = map[CodecID]string{ CodecH264: "video", CodecH265: "video", CodecAAC: "audio", CodecG711A: "audio", CodecG711U: "audio", CodecOpus: "audio", CodecL16: "audio", CodecVP8: "video", CodecVP9: "video", CodecAV1: "video", CodecJPEG: "video", CodecH266: "video", CodecTS: "video", CodecPS: "video", CodecMP3: "audio", CodecADPCM: "audio", CodecSVACV: "video", CodecSVACA: "audio", CodecG722: "audio", CodecG723: "audio", CodecG728: "audio", CodecG729: "audio", } func (c CodecID) Type() string { if typ, ok := codecTypes[c]; ok { return typ } return "invalid" } func (c CodecID) TrackType() TrackType { strType := c.Type() if strType == "video" { return TrackVideo } else if strType == "audio" { return TrackAudio } return TrackInvalid } func (c CodecID) String() string { if name, ok := codecNames[c]; ok { return name } return "invalid" } func GetCodecID(name string) CodecID { for cid, n := range codecNames { if n == name { return cid } } return CodecInvalid } // ==================== 帧接口定义 ==================== type Frame interface { DTS() uint64 PTS() uint64 PrefixSize() int KeyFrame() bool ConfigFrame() bool CacheAble() bool DropAble() bool DecodeAble() bool Data() []byte Size() int CodecID() CodecID TrackType() TrackType } // ==================== 基础帧实现 ==================== type BaseFrame struct { Dts uint64 Pts uint64 VPrefixSize int Codec CodecID Track TrackType } func (f *BaseFrame) DTS() uint64 { return f.Dts } func (f *BaseFrame) PTS() uint64 { return f.Pts } func (f *BaseFrame) PrefixSize() int { return f.VPrefixSize } func (f *BaseFrame) CodecID() CodecID { return f.Codec } func (f *BaseFrame) TrackType() TrackType { return f.Track } // ==================== 帧包装器 ==================== type FrameFromBytes struct { *BaseFrame DataPtr []byte IsKey bool IsConfig bool } func (f *FrameFromBytes) KeyFrame() bool { return f.IsKey } func (f *FrameFromBytes) ConfigFrame() bool { return f.IsConfig } func (f *FrameFromBytes) CacheAble() bool { return false } func (f *FrameFromBytes) DropAble() bool { return false } func (f *FrameFromBytes) DecodeAble() bool { if f.TrackType() != TrackVideo { return true } return !f.ConfigFrame() } func (f *FrameFromBytes) Data() []byte { return f.DataPtr } func (f *FrameFromBytes) Size() int { return len(f.DataPtr) } // ==================== 可缓存帧 ==================== type FrameCacheAble struct { *FrameFromBytes Buffer []byte } func NewFrameCacheAble(frame Frame, forceKeyFrame bool) Frame { if frame.CacheAble() { return frame } buffer := make([]byte, frame.Size()) copy(buffer, frame.Data()) return &FrameCacheAble{ FrameFromBytes: &FrameFromBytes{ BaseFrame: &BaseFrame{ Dts: frame.DTS(), Pts: frame.PTS(), VPrefixSize: frame.PrefixSize(), Codec: frame.CodecID(), Track: frame.TrackType(), }, DataPtr: buffer, IsKey: forceKeyFrame || frame.KeyFrame(), IsConfig: frame.ConfigFrame(), }, Buffer: buffer, } } func (f *FrameCacheAble) CacheAble() bool { return true } // ==================== 时间戳修正 ==================== type FrameStamp struct { Frame Dts int64 Pts int64 } func NewFrameStamp(frame Frame, dts, pts int64) Frame { return &FrameStamp{Frame: frame, Dts: dts, Pts: pts} } func (f *FrameStamp) DTS() uint64 { return uint64(f.Dts) } func (f *FrameStamp) PTS() uint64 { return uint64(f.Pts) } // ==================== 帧合并器 ==================== type FrameMerger struct { frameCache []Frame haveDecodeable bool mergeType int } const ( MergeNone = iota MergeH264Prefix MergeMP4NalSize ) func NewFrameMerger(mergeType int) *FrameMerger { return &FrameMerger{mergeType: mergeType} } func (m *FrameMerger) WillFlush(frame Frame) bool { if len(m.frameCache) == 0 { return false } if frame == nil { return true } last := m.frameCache[len(m.frameCache)-1] switch m.mergeType { case MergeNone: return last.DTS() != frame.DTS() || len(m.frameCache) > 100 case MergeH264Prefix, MergeMP4NalSize: return last.DTS() != frame.DTS() || frame.DecodeAble() || frame.ConfigFrame() || len(m.frameCache) > 100 default: return true } } func (m *FrameMerger) DoMerge(buffer *[]byte, frame Frame) { switch m.mergeType { case MergeNone: *buffer = append(*buffer, frame.Data()...) case MergeH264Prefix: if frame.PrefixSize() == 0 { *buffer = append(*buffer, 0, 0, 0, 1) } // 确保每个NAL单元都有独立的起始码 *buffer = append(*buffer, frame.Data()...) case MergeMP4NalSize: size := uint32(frame.Size() - frame.PrefixSize()) *buffer = append(*buffer, byte(size>>24), byte(size>>16), byte(size>>8), byte(size)) *buffer = append(*buffer, frame.Data()[frame.PrefixSize():]...) } } func (m *FrameMerger) InputFrame(frame Frame, cb func(dts, pts uint64, buffer []byte, haveKeyFrame bool)) bool { if frame == nil { return false } if frame.DecodeAble() { m.haveDecodeable = true } m.frameCache = append(m.frameCache, frame) if frame != nil && !m.NeedMerge(frame.CodecID()) { cb(frame.DTS(), frame.PTS(), frame.Data(), frame.KeyFrame()) return true } if m.WillFlush(frame) { last := m.frameCache[len(m.frameCache)-1] var buffer []byte var haveKeyFrame = last.KeyFrame() if len(m.frameCache) > 1 || m.mergeType == MergeMP4NalSize { buffer = make([]byte, 0, last.Size()+1024) for _, f := range m.frameCache { m.DoMerge(&buffer, f) if f.KeyFrame() { haveKeyFrame = true } } cb(last.DTS(), last.PTS(), buffer, haveKeyFrame) m.frameCache = m.frameCache[:0] m.haveDecodeable = false } } return true } func (m *FrameMerger) NeedMerge(codec CodecID) bool { return codec == CodecH264 || codec == CodecH265 } func (m *FrameMerger) Flush(cb func(dts, pts uint64, buffer []byte, haveKeyFrame bool)) { if len(m.frameCache) > 0 { m.InputFrame(nil, cb) } m.frameCache = m.frameCache[:0] m.haveDecodeable = false } // ==================== 帧分发器 ==================== type FrameDispatcher struct { delegates map[*FrameWriter]bool mtx sync.Mutex // 统计信息 videoKeyFrames uint64 frames uint64 lastFrames uint64 gopSize uint64 gopInterval time.Duration lastKeyFrameTS uint64 } func NewFrameDispatcher() *FrameDispatcher { return &FrameDispatcher{delegates: make(map[*FrameWriter]bool)} } func (d *FrameDispatcher) AddDelegate(w *FrameWriter) { d.mtx.Lock() defer d.mtx.Unlock() d.delegates[w] = true } func (d *FrameDispatcher) DelDelegate(w *FrameWriter) { d.mtx.Lock() defer d.mtx.Unlock() delete(d.delegates, w) } func (d *FrameDispatcher) InputFrame(frame Frame) bool { if !frame.ConfigFrame() && !frame.DropAble() { d.frames++ if frame.KeyFrame() && frame.TrackType() == TrackVideo { d.videoKeyFrames++ d.gopSize = d.frames - d.lastFrames d.gopInterval = time.Duration(frame.DTS()-d.lastKeyFrameTS) * time.Millisecond d.lastFrames = d.frames d.lastKeyFrameTS = frame.DTS() } } d.mtx.Lock() defer d.mtx.Unlock() var ret bool for w := range d.delegates { if w.InputFrame(frame) { ret = true } } return ret } func (d *FrameDispatcher) GetStats() (uint64, uint64, uint64, time.Duration) { return d.frames, d.videoKeyFrames, d.gopSize, d.gopInterval } // Size 返回当前注册的代理数量 // Size returns the number of registered delegates func (d *FrameDispatcher) Size() int { d.mtx.Lock() defer d.mtx.Unlock() return len(d.delegates) } // ==================== 帧写入接口 ==================== type FrameWriter struct { OnFrame func(Frame) bool } func (w *FrameWriter) InputFrame(frame Frame) bool { if w.OnFrame != nil { return w.OnFrame(frame) } return false }