提交 9d0883b5 authored 作者: mooncake9527's avatar mooncake9527

修改event bus

上级 c93b0b27
package eventbus package eventbus
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
...@@ -15,7 +16,7 @@ type BusSubscriber interface { ...@@ -15,7 +16,7 @@ type BusSubscriber interface {
} }
type BusPublisher interface { type BusPublisher interface {
Publish(topic string, args ...interface{}) Publish(ctx context.Context, topic string, args ...interface{})
} }
type BusController interface { type BusController interface {
...@@ -106,7 +107,7 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { ...@@ -106,7 +107,7 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
return fmt.Errorf("topic %s doesn't exist", topic) return fmt.Errorf("topic %s doesn't exist", topic)
} }
func (bus *EventBus) Publish(topic string, args ...interface{}) { func (bus *EventBus) Publish(ctx context.Context, topic string, args ...interface{}) {
bus.lock.Lock() bus.lock.Lock()
defer bus.lock.Unlock() defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) { if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
...@@ -117,7 +118,7 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) { ...@@ -117,7 +118,7 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.removeHandler(topic, i) bus.removeHandler(topic, i)
} }
if !handler.async { if !handler.async {
bus.doPublish(handler, args...) bus.doPublish(ctx, handler, args...)
} else { } else {
bus.wg.Add(1) bus.wg.Add(1)
if handler.transactional { if handler.transactional {
...@@ -126,24 +127,27 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) { ...@@ -126,24 +127,27 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() bus.lock.Lock()
} }
go func() { go func() {
bus.doPublishAsync(handler, args...) bus.doPublishAsync(ctx, handler, args...)
}() }()
} }
} }
} }
} }
func (bus *EventBus) doPublish(handler *eventHandler, args ...interface{}) { func (bus *EventBus) doPublish(ctx context.Context, handler *eventHandler, args ...interface{}) {
passedArguments := bus.setUpPublish(handler, args...) allArgs := make([]any, 0, len(args)+1)
allArgs = append(allArgs, ctx)
allArgs = append(allArgs, args...)
passedArguments := bus.setUpPublish(handler, allArgs...)
handler.callBack.Call(passedArguments) handler.callBack.Call(passedArguments)
} }
func (bus *EventBus) doPublishAsync(handler *eventHandler, args ...interface{}) { func (bus *EventBus) doPublishAsync(ctx context.Context, handler *eventHandler, args ...interface{}) {
defer bus.wg.Done() defer bus.wg.Done()
if handler.transactional { if handler.transactional {
defer handler.Unlock() defer handler.Unlock()
} }
bus.doPublish(handler, args...) bus.doPublish(ctx, handler, args...)
} }
func (bus *EventBus) removeHandler(topic string, idx int) { func (bus *EventBus) removeHandler(topic string, idx int) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论