Footman是怎样练成的
序
啦啦啦啦啦啦,我是写bug的小行家
距离上次水文,又过去了半年咯~~
作为打工人来讲,天天忙忙碌碌一回头,却不知自己都干了点啥….
最近在忙碌的工作当中,遇到了一些有趣的问题
感觉有些价值,整理一下
下面我们就聊聊今天的主角,我叫他 Footman
起因
起因说起来比较简单,最初是我们在ws服务内部,服务端对于客户端主动发起的消息推送,是基于channel 或者 阻塞信号等行为实现的.
这时候对于推送的发起方,就需要轮训连接聚丙,然后挨个push
所以性能上不太好保证(尤其是一些地方用到了lock)
这个时候,俺斌哥提出了一个想法,能不能让客户端的write协程直接去读呢?
此时那可谓是一拍即合
但是在最初实现的过程中,我数据结构参考了LRU 使用了map+单向链表进行了实现
而且使用了一个读写锁
此时,斌哥直接提出来第一版修改意见
经过讨论,首先从数据结构来讲,Ring Buffer,其次在实现细节来讲,希望摒弃掉lock的操作
干货时间
首先,我们实现的目的,参考的kafka的逻辑
在这里就存在了Topic的角色(RingBuffer的主体)
且针对与内部进行了一些多Topic订阅的处理
下面挨个的介绍一下内部用到的组件
Topic
先来看topic结构体以及相关函数
type (
Topic struct {
name *string
limit *int
seq *uint64
data []*message
}
)
func (t *Topic) init() {
t.seq = ptr.Uint64(0)
if t.limit == nil || *t.limit == 0 {
t.limit = ptr.Int(10000)
}
t.data = make([]*message, *t.limit)
}
func (t *Topic) Limit(i int) {
t.limit = ptr.Int(i)
}
func (t *Topic) Offset() uint64 {
return atomic.LoadUint64(t.seq)
}
func (t *Topic) idx(offset uint64) int {
return int(offset) % *t.limit
}
func (t *Topic) Find(offset uint64) ([]*message, error) {
seq := atomic.LoadUint64(t.seq)
switch {
case offset > seq:
return nil, ErrOutOfRange
case offset == seq:
return nil, ErrNoData
default:
ret := make([]*message, 0)
// 套圈问题
if ul := uint64(*t.limit); seq > ul {
if off := seq - ul; off > offset {
offset = off
}
}
for offset = offset + 1; offset <= seq; offset++ {
idx := t.idx(offset)
node := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.data[idx])))
if node != nil {
ret = append(ret, (*message)(node))
}
}
return ret, nil
}
}
func (t *Topic) Append(tg any) {
msg := NewMessage(t.name, tg)
t.AppendMessage(msg)
}
func (t *Topic) AppendMessage(msg *message) {
seq := atomic.AddUint64(t.seq, 1)
idx := t.idx(seq)
msg.offset = ptr.Uint64(seq)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&t.data[idx])), unsafe.Pointer(msg))
}
这里主要是实现了一个RingBuffer,且对他的message offset分配,基于offset查询等功能的支持
并且利用atomic包进行原子操作,避免锁机制的引入
Server
Server 主要是针对与多个topic的管理,包含创建,初始化,删除等等功能
type (
Svr struct {
topics *sync.Map
opts []Option
}
)
func NewSvr(o ...Option) *Svr {
return &Svr{
topics: &sync.Map{},
opts: o,
}
}
func (s *Svr) t(topicName string) *Topic {
t := &Topic{
name: ptr.String(topicName),
}
for i := range s.opts {
s.opts[i](t)
}
return t
}
func (s *Svr) LoadTopic(topicName string) *Topic {
t, loaded := s.topics.LoadOrStore(topicName, s.t(topicName))
topic := t.(*Topic)
if !loaded {
topic.init()
}
return topic
}
func (s *Svr) RemoveTopic(topicName string) {
_, loaded := s.topics.Load(topicName)
if loaded {
s.topics.Delete(topicName)
}
}
func (s *Svr) Subscribe(topic ...string) *Consumer {
return NewConsumer(s).init().Subscribe(topic...)
}
func (s *Svr) Produce(topic string, data any) {
s.LoadTopic(topic).Append(data)
}
func (s *Svr) ProduceMessage(msg *message) {
s.LoadTopic(msg.Topic()).AppendMessage(msg)
}
processor
processor 主要用于管理当前consumer对于某个topic的进度
processors 主要是对于一组进度管理的封装
type (
processor struct {
offset uint64
topic *Topic
}
processors struct {
m *sync.Map
svr *Svr
}
)
func (p *processor) find() ([]*message, error) {
msg, err := p.topic.Find(p.offset)
if err == nil && len(msg) > 0 {
p.offset = msg[len(msg)-1].Offset()
}
return msg, err
}
func (p *processors) Subscribe(topic ...string) {
for _, v := range topic {
if _, ok := p.m.Load(v); !ok {
top := p.svr.LoadTopic(v)
p.m.Store(v, &processor{
offset: top.Offset(),
topic: top,
})
}
}
}
func (p *processors) Find() ([]*message, error) {
var (
ret = make([]*message, 0)
limit int
ch = make(chan []*message)
ech = make(chan error)
err error
)
p.m.Range(func(key, value any) bool {
limit++
go func(pp *processor) {
info, ce := pp.find()
if ce != nil && !errors.Is(ce, ErrNoData) {
ech <- ce
return
}
ch <- info
}(value.(*processor))
return true
})
for i := 0; i < limit; i++ {
select {
case info := <-ch:
ret = append(ret, info...)
case e := <-ech:
if !errors.Is(e, ErrNoData) {
err = e
}
}
}
close(ch)
close(ech)
return ret, err
}
Consumer
由于核心的查询功能,在processors里已经实现,所以Consumer这里只需要管理processors以及对于find函数进行一些超时管理即可
type(
Consumer struct {
timerAfter time.Duration
svr *Svr
ps *processors
}
)
func (c *Consumer) Timer(d time.Duration) *Consumer {
c.timerAfter = d
return c
}
func (c *Consumer) Subscribe(topic ...string) *Consumer {
c.ps.Subscribe(topic...)
return c
}
func (c *Consumer) init() *Consumer {
c.ps = &processors{
m: &sync.Map{},
svr: c.svr,
}
c.Timer(time.Second / 10)
return c
}
func (c *Consumer) ReadMessage(d time.Duration) ([]*message, error) {
timer := time.NewTimer(0)
ctx, cancel := context.WithCancel(context.Background())
if d > 0 {
ctx, cancel = context.WithTimeout(ctx, d)
} else if d == 0 {
ctx, cancel = context.WithTimeout(ctx, time.Second/2)
}
defer cancel()
for {
select {
case <-timer.C:
ret, err := c.ps.Find()
if err != nil {
return nil, err
}
if len(ret) > 0 {
return ret, nil
}
timer.Reset(c.timerAfter)
case <-ctx.Done():
return nil, ErrReadTimeout
}
}
}
结尾
以上就是整体的实现过程以及思路了.
个人感觉还是有一些参考价值,不过暂时还没有投入使用,欢迎大家拍砖.
打工人继续搬砖去也