提交 e98cb7a0 authored 作者: mooncake9527's avatar mooncake9527

1.配置拆分2.支持注册中心nacos

上级 d106186e
package conf
import (
"bytes"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
"log"
consulapi "github.com/armon/consul-api"
"github.com/spf13/viper"
)
const (
DefaultEndPoint = "consul.default.svc.cluster.local:8500"
DefaultPath = "xmall/conf"
DefaultToken = "9a6bc59e-73d7-7af5-a669-818a200a9ded"
DefaultConfigType = "yml"
)
// JwtOption set the jwt options.
type ConsulOption func(*ConsulParser)
func WithToken(token string) ConsulOption {
return func(o *ConsulParser) {
o.token = token
}
}
func WithConfigType(configType string) ConsulOption {
return func(o *ConsulParser) {
o.configType = configType
}
}
type ConsulParser struct {
endpoint string
configPath string
configType string
token string
cfg *consulapi.Config
consulClient *consulapi.Client
}
func NewConsulParser(endpoint, configPath string, opts ...ConsulOption) (*ConsulParser, error) {
if endpoint == "" {
return nil, xerror.New("endpoint cannot be empty")
}
if configPath == "" {
return nil, xerror.New("configPath cannot be empty")
}
o := &ConsulParser{endpoint: endpoint, configPath: configPath}
for _, opt := range opts {
opt(o)
}
ccfg := consulapi.Config{Address: endpoint, Token: o.token}
o.cfg = &ccfg
consulClient, err := consulapi.NewClient(&ccfg)
if err != nil {
log.Fatal(err)
}
o.consulClient = consulClient
return o, nil
}
func (x *ConsulParser) Read(obj any) error {
kv, _, err := x.consulClient.KV().Get(x.configPath, nil)
if err != nil {
return err
}
viper.SetConfigType(x.configType)
err = viper.ReadConfig(bytes.NewBuffer(kv.Value))
if err != nil {
return err
}
err = viper.Unmarshal(obj)
if err != nil {
return err
}
return nil
}
package conf
import (
"fmt"
"testing"
)
func TestNacosClient_Parse(t *testing.T) {
ncfg := NacosConfig{
IPs: []string{"192.168.1.118"},
Port: 8848,
DataID: "pay-admin.yaml",
Group: "qitu",
}
if err := New(&ncfg, func(content string) (err error) {
fmt.Println(content)
return nil
}, nil).Connect().Watch().Fetch().Parse().Err(); err != nil {
t.Error(err.Error())
return
}
t.Log("success")
}
package conf
import (
"fmt"
"os"
"testing"
"time"
)
var c = make(map[string]interface{})
func TestShow(t *testing.T) {
t.Log(Show(c))
t.Log(Show(make(chan string)))
}
func Test_replaceDSN(t *testing.T) {
dsn := "default:123456@192.168.3.37:6379/0"
t.Log(replaceDSN(dsn))
dsn = "default:123456:192.168.3.37:6379/0"
t.Log(replaceDSN(dsn))
}
func Test_hideSensitiveFields(t *testing.T) {
var keywords []string
keywords = append(keywords, `"dsn"`, `"password"`, `"name"`)
str := Show(c, keywords...)
fmt.Printf(HideSensitiveFields(str))
str = "\ndefault:123456@192.168.3.37:6379/0\n"
fmt.Printf(HideSensitiveFields(str))
}
// test listening for configuration file updates
func TestParse(t *testing.T) {
conf := make(map[string]interface{})
reloads := []func(){
func() {
fmt.Println("close and reconnect mysql")
fmt.Println("close and reconnect redis")
},
}
err := Parse("test.yml", &conf, reloads...)
if err != nil {
t.Error(err)
return
}
time.Sleep(time.Second)
content, _ := os.ReadFile("test.yml")
contentChange := append(content, byte('#'))
time.Sleep(time.Millisecond * 100)
_ = os.WriteFile("test.yml", contentChange, 0666) // change file
time.Sleep(time.Millisecond * 100)
_ = os.WriteFile("test.yml", content, 0666) // recovery documents
time.Sleep(time.Millisecond * 100)
}
func TestParseErr(t *testing.T) {
// result error test
err := Parse("test.yml", nil)
t.Log(err)
// not found error test
err = Parse("notfound.yml", &c)
t.Log(err)
}
func TestParseConfigData(t *testing.T) {
conf := make(map[string]interface{})
data, err := os.ReadFile("test.yml")
if err != nil {
t.Error(err)
return
}
err = ParseConfigData(data, "yaml", &conf)
if err != nil {
t.Error(err)
return
}
t.Log(Show(conf))
}
# app settings
app:
name: "serverNameExample"
env: "dev1"
version: "v0.0.0"
password: "123456"
database:
driver: "mysql"
# mysql settings
mysql:
# dsn format, <user>:<pass>@(127.0.0.1:3306)/<db>?[k=v& ......]
dsn: "root:123456@(192.168.3.37:3306)/account?parseTime=true&loc=Local&charset=utf8,utf8mb4"
# redis settings
redis:
# dsn format, [user]:<pass>@]127.0.0.1:6379/[db]
dsn: "default:123456@192.168.3.37:6379/0"
dialTimeout: 10
readTimeout: 2
writeTimeout: 2
package config
import (
"strings"
"sync"
"time"
"github.com/jinzhu/copier"
"gitlab.wanzhuangkj.com/tush/xpkg/onacos"
"gitlab.wanzhuangkj.com/tush/xpkg/rd/nacos"
alioss "gitlab.wanzhuangkj.com/tush/xpkg/third/alioss"
)
var Cfg = &Config{
l: &sync.RWMutex{},
}
type ICfg interface {
Lock()
Unlock()
RLock()
RUnlock()
GetContent() []byte
SetContent(content []byte)
}
func Write(f func(c *Config)) {
if Cfg == nil {
panic("config is nil, please call config.Init() first")
}
Cfg.l.Lock()
defer Cfg.l.Unlock()
f(Cfg)
}
func Get() *Config {
if Cfg == nil {
panic("config is nil, please call config.Init() first")
}
Cfg.l.RLock()
defer Cfg.l.RUnlock()
c := &Config{}
copier.Copy(c, Cfg)
return c
}
func CronOpen() bool {
enable := false
Read(func(c *Config) {
enable = c.CronJobs.Enable
})
return enable
}
func GetOssConfig() *alioss.AliOssConfig {
ossCfg := &alioss.AliOssConfig{}
Read(func(c *Config) {
ossCfg.Endpoint = c.AliOss.Endpoint
ossCfg.AccessKeyId = c.AliOss.AccessKeyId
ossCfg.AccessKeySecret = c.AliOss.AccessKeySecret
ossCfg.BucketName = c.AliOss.BucketName
ossCfg.BasePath = c.AliOss.BasePath
ossCfg.BucketUrl = c.AliOss.BucketUrl
ossCfg.Region = c.AliOss.Region
})
return ossCfg
}
func Read(f func(c *Config)) {
if Cfg == nil {
panic("config is nil, please call config.Init() first")
}
Cfg.l.RLock()
f(Cfg)
Cfg.l.RUnlock()
}
func IsNotProd() bool {
return !IsProd()
}
func IsProd() bool {
env := "prod"
Read(func(c *Config) {
env = c.App.Env
})
return strings.ToLower(env) == "prod"
}
type Config struct {
l *sync.RWMutex
App App `yaml:"app"`
Auth Auth `yaml:"auth"`
ConfType string `yaml:"-"`
Local Local `yaml:"-"`
Consul Consul `yaml:"consul"`
Nacos onacos.NacosConfig `yaml:"nacos"`
Database []Database `yaml:"database"`
Etcd Etcd `yaml:"etcd"`
Grpc Grpc `yaml:"grpc"`
GrpcClient []GrpcClient `yaml:"grpcClient"`
HTTP HTTP `yaml:"http"`
Jaeger Jaeger `yaml:"jaeger"`
Logger Logger `yaml:"logger"`
NacosRd NacosRd `yaml:"nacosRd"`
Redis Redis `yaml:"redis"`
CronJobs Cron `yaml:"cron"`
NacosConfClient *NacosClient `yaml:"-" json:"-"`
NacosNamingClient *nacos.NacosNamingClient `yaml:"-" json:"-"`
AliOss alioss.AliOssConfig `yaml:"oss"`
Content []byte `yaml:"-"`
}
func (x *Config) GetContent() []byte {
x.l.RLock()
defer x.l.RUnlock()
return x.Content
}
func (x *Config) Lock() {
x.l.Lock()
}
func (x *Config) Unlock() {
x.l.Unlock()
}
func (x *Config) RLock() {
x.l.RLock()
}
func (x *Config) RUnlock() {
x.l.RUnlock()
}
func (x *Config) SetContent(content []byte) {
x.l.Lock()
defer x.l.Unlock()
x.Content = content
}
type Local struct {
Conf string `yaml:"-"`
}
type Auth struct {
Enable bool `yaml:"enable"`
SignKey string `yaml:"signKey"`
Expire time.Duration `yaml:"expire"`
}
type Consul struct {
Conf string `yaml:"conf"`
Addr string `yaml:"addr"`
Token string `yaml:"token"`
}
type Etcd struct {
Addrs []string `yaml:"addrs"`
}
type Jaeger struct {
AgentHost string `yaml:"agentHost"`
AgentPort int `yaml:"agentPort"`
}
type ClientToken struct {
AppID string `yaml:"appID"`
AppKey string `yaml:"appKey"`
Enable bool `yaml:"enable"`
}
type ClientSecure struct {
CaFile string `yaml:"caFile"`
CertFile string `yaml:"certFile"`
KeyFile string `yaml:"keyFile"`
ServerName string `yaml:"serverName"`
Type string `yaml:"type"`
}
type ServerSecure struct {
CaFile string `yaml:"caFile"`
CertFile string `yaml:"certFile"`
KeyFile string `yaml:"keyFile"`
Type string `yaml:"type"`
}
type EmailConfig struct {
SMTPServer string `yaml:"smtpServer"`
SMTPPort int `yaml:"smtpPort"`
SMTPUsername string `yaml:"smtpUsername"`
SMTPPassword string `yaml:"smtpPassword"`
FromEmail string `yaml:"fromEmail"`
ExpireDuration string `yaml:"expireDuration"`
}
type App struct {
CacheType string `yaml:"cacheType"`
EnableCircuitBreaker bool `yaml:"enableCircuitBreaker"`
EnableHTTPProfile bool `yaml:"enableHTTPProfile"`
EnableLimit bool `yaml:"enableLimit"`
EnableMetrics bool `yaml:"enableMetrics"`
EnableStat bool `yaml:"enableStat"`
EnableTrace bool `yaml:"enableTrace"`
Env string `yaml:"env"`
Host string `yaml:"host"`
Name string `yaml:"name"`
RegistryDiscoveryType string `yaml:"registryDiscoveryType"`
TracingSamplingRate float64 `yaml:"tracingSamplingRate"`
Version string `yaml:"version"`
IPRateLimiter IPRateLimiter `yaml:"ipRateLimiter"`
ConfigSourceType string `yaml:"-"` // local,consul,nacos
ConfPath string `yaml:"-"`
PodName string `yaml:"-"`
WebLog WebLog `yaml:"webLog"`
Email EmailConfig `yaml:"email"`
}
type WebLog struct {
Enable bool `yaml:"enable"`
IgnorePaths []string `yaml:"ignorePaths"`
}
type IPRateLimiter struct {
Enable bool `yaml:"enable"`
Sync bool `yaml:"sync"`
Window string `yaml:"window"`
MaxRequests int `yaml:"maxRequests"`
Store RateLimiterStore `yaml:"store"`
}
type RateLimiterStore struct {
Type string `yaml:"redis"`
Addr string `yaml:"addr"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}
func (x IPRateLimiter) Get() (time.Duration, int) {
if x.Window == "" {
return time.Second, 10
}
window, err := time.ParseDuration(x.Window)
if err != nil {
return time.Second, 10
}
if x.MaxRequests <= 0 {
return time.Second, 10
}
return window, x.MaxRequests
}
func (x IPRateLimiter) GetStore() (string, string, int) {
addr := "localhost:6379"
if x.Store.Addr != "" {
addr = x.Store.Addr
}
return addr, x.Store.Password, x.Store.DB
}
type GrpcClient struct {
ClientSecure ClientSecure `yaml:"clientSecure"`
ClientToken ClientToken `yaml:"clientToken"`
Host string `yaml:"host"`
Name string `yaml:"name"`
Port int `yaml:"port"`
RegistryDiscoveryType string `yaml:"registryDiscoveryType"`
Timeout int `yaml:"timeout"`
}
type Sqlite struct {
ConnMaxLifetime int `yaml:"connMaxLifetime"`
DBFile string `yaml:"dbFile"`
EnableLog bool `yaml:"enableLog"`
MaxIdleConns int `yaml:"maxIdleConns"`
MaxOpenConns int `yaml:"maxOpenConns"`
}
type Mysql struct {
ConnMaxLifetime int `yaml:"connMaxLifetime"`
Dsn string `yaml:"dsn"`
EnableLog bool `yaml:"enableLog"`
MastersDsn []string `yaml:"mastersDsn"`
MaxIdleConns int `yaml:"maxIdleConns"`
MaxOpenConns int `yaml:"maxOpenConns"`
SlavesDsn []string `yaml:"slavesDsn"`
}
type Postgresql struct {
ConnMaxLifetime int `yaml:"connMaxLifetime"`
Dsn string `yaml:"dsn"`
EnableLog bool `yaml:"enableLog"`
MaxIdleConns int `yaml:"maxIdleConns"`
MaxOpenConns int `yaml:"maxOpenConns"`
}
type Redis struct {
DialTimeout int `yaml:"dialTimeout"`
Dsn string `yaml:"dsn"`
ReadTimeout int `yaml:"readTimeout"`
WriteTimeout int `yaml:"writeTimeout"`
}
type Database struct {
Name string `yaml:"name"`
Driver string `yaml:"driver"`
Mongodb Mongodb `yaml:"mongodb"`
Mysql Mysql `yaml:"mysql"`
Postgresql Mysql `yaml:"postgresql"`
Sqlite Sqlite `yaml:"sqlite"`
}
type Mongodb struct {
Dsn string `yaml:"dsn"`
}
type Grpc struct {
EnableToken bool `yaml:"enableToken"`
HTTPPort int `yaml:"httpPort"`
Port int `yaml:"port"`
ServerSecure ServerSecure `yaml:"serverSecure"`
}
type Logger struct {
Format string `yaml:"format"`
IsSave bool `yaml:"isSave"`
Level string `yaml:"level"`
LogFileConfig LogFileConfig `yaml:"logFileConfig"`
}
type LogFileConfig struct {
Filename string `yaml:"filename"`
MaxSize int `yaml:"maxSize"`
MaxBackups int `yaml:"maxBackups"`
MaxAge int `yaml:"maxAge"`
IsCompression bool `yaml:"isCompression"`
}
type NacosRd struct {
IPAddr string `yaml:"ipAddr"`
NamespaceID string `yaml:"namespaceID"`
Port int `yaml:"port"`
}
type HTTP struct {
Port int `yaml:"port"`
Timeout int `yaml:"timeout"`
}
type UpgradeConfig struct {
Limits []UpgradeLimit `yaml:"limits"`
}
type UpgradeLimit struct {
MaxCount int64 `mapstructure:"max-count" json:"max-count" yaml:"max-count"`
Duration time.Duration `yaml:"duration"`
Tip string `yaml:"tip"`
}
type Cron struct {
// Jobs []CronJob`yaml:"jobs"`
Enable bool `yaml:"enable"`
}
type CronJob struct {
Name string `yaml:"name"`
TimeSpec string `yaml:"timeSpec"`
IsRunOnce bool ` yaml:"isRunOnce"` // if the task is only run once
Enable bool `yaml:"enable"`
Args string `yaml:"args"`
}
package config
import (
"bytes"
"context"
"time"
"github.com/hashicorp/consul/api"
"github.com/spf13/viper"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/hashUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
type ConsulConfFetcher struct {
isNew bool
confKey string
err error
ctx context.Context
cfg ICfg
loads []func(conf []byte)
}
func NewFetcher(confKey string, cfg ICfg, loads ...func(conf []byte)) *ConsulConfFetcher {
return &ConsulConfFetcher{
cfg: cfg,
confKey: confKey,
ctx: context.Background(),
loads: loads,
}
}
func (x *ConsulConfFetcher) fetch() error {
x.sync().parse()
return x.err
}
func (x *ConsulConfFetcher) sync() *ConsulConfFetcher {
consul := api.DefaultConfig()
client, err := api.NewClient(consul)
if err != nil {
x.err = err
return x
}
kv, _, err := client.KV().Get(x.confKey, &api.QueryOptions{})
if err != nil {
x.err = err
return x
}
if kv == nil {
x.err = xerror.New("[consul]请先配置consul")
return x
}
if hashUtils.Md5(kv.Value) != hashUtils.Md5(x.cfg.GetContent()) {
x.isNew = true
x.cfg.SetContent(kv.Value)
}
return x
}
func (x *ConsulConfFetcher) parse() *ConsulConfFetcher {
if len(x.cfg.GetContent()) == 0 || !x.isNew {
return x
}
viper.SetConfigType("yaml")
if err := viper.ReadConfig(bytes.NewBuffer([]byte(x.cfg.GetContent()))); err != nil {
x.err = err
return x
}
x.cfg.Lock()
if err := viper.Unmarshal(x.cfg); err != nil {
x.cfg.Unlock()
x.err = err
return x
}
x.cfg.Unlock()
x.isNew = false
for _, load := range x.loads {
load(x.cfg.GetContent())
}
logger.Debug("[consul] conf updated")
return x
}
func (x *ConsulConfFetcher) Start() error {
if err := x.fetch(); err != nil {
return err
}
duration := time.Minute * 5
ticker := time.NewTicker(duration)
go func() {
for {
select {
case <-x.ctx.Done():
return
case <-ticker.C:
if err := x.fetch(); err != nil {
logger.Error("[consul]fetch config from consul error", logger.Err(err))
continue
}
}
}
}()
return nil
}
package config
import (
"bytes"
"os"
"strings"
"github.com/spf13/cast"
"github.com/spf13/viper"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
)
var (
extend IExtend // extend pointer
)
type IExtend interface {
Lock()
Unlock()
}
func init() {
if err := parse(); err != nil {
panic(err)
}
}
func parse() (err error) {
confType := loadEnvWithDefault("CONF_TYPE", "local")
logger.Infof("[conf]conf type:%s", confType)
Cfg.ConfType = confType
if confType == "local" {
localConf()
return parseLocal(Cfg.Local.Conf, Cfg)
}
if confType == "consul" {
consulConf()
confKey := Cfg.Consul.Conf
return NewFetcher(confKey, Cfg, func(conf []byte) {
ParseExtend()
}).Start()
}
if confType == "nacos" {
nacosConf()
nc := NewNacos(&Cfg.Nacos, func(content []byte) (err error) {
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewReader(content)); err != nil {
return err
}
if err := v.Unmarshal(&Cfg); err != nil {
return err
}
return ParseExtend()
}, logger.Get())
if err := nc.Connect().Watch().Fetch().Parse().Err(); err != nil {
return err
}
Cfg.NacosConfClient = nc
}
return nil
}
func SetExtend(ext IExtend) {
extend = ext
}
func ParseExtend() error {
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewReader(Cfg.GetContent())); err != nil {
return err
}
if extend != nil {
if subv := v.Sub("extend"); subv != nil {
extend.Lock()
defer extend.Unlock()
if err := subv.Unmarshal(extend); err != nil {
return err
}
}
}
return nil
}
func localConf() {
confFile := loadEnvWithDefault("CONF_FILES", "./conf/conf.yaml")
Write(func(c *Config) {
c.Local.Conf = confFile
})
}
func nacosConf() {
host := loadEnvWithDefault("NACOS_HOST", "localhost")
port := loadEnvWithDefault("NACOS_PORT", "8848")
contextPath := loadEnvWithDefault("NACOS_CONTEXT", "/nacos")
namespace := loadEnvWithDefault("NACOS_NAMESPACE", "")
dataID := loadEnvWithDefault("NACOS_DATA_ID", "operator-admin.yaml")
group := loadEnvWithDefault("NACOS_GROUP", "qitu")
logger.Infof("[conf]conf host:%s namespace:%s dataId:%s group:%s", host, namespace, dataID, group)
Cfg.Nacos.IPs = strings.Split(host, ",")
Cfg.Nacos.Port = cast.ToInt(port)
Cfg.Nacos.Conf.Namespace = namespace
Cfg.Nacos.Conf.DataID = dataID
Cfg.Nacos.Conf.Group = group
Cfg.Nacos.Conf.ContextPath = contextPath
}
func consulConf() {
if confFile := os.Getenv("CONSUL_CONFIG_FILES"); confFile != "" {
Write(func(c *Config) {
c.Consul.Conf = confFile
})
}
if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
Write(func(c *Config) {
c.Consul.Addr = addr
})
}
if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
Write(func(c *Config) {
c.Consul.Token = token
})
}
}
func loadEnvWithDefault(key, defaultValue string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return defaultValue
}
// Package conf is parsing yaml, json, toml configuration files to go struct. // Package conf is parsing yaml, json, toml configuration files to go struct.
package conf package config
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// Parse configuration files to struct, including yaml, toml, json, etc., and turn on listening for configuration file changes if fs is not empty // parseLocal configuration files to struct, including yaml, toml, json, etc., and turn on listening for configuration file changes if fs is not empty
func Parse(configFile string, obj interface{}, reloads ...func()) error { func parseLocal(configFile string, obj ICfg, reloads ...func()) error {
confFileAbs, err := filepath.Abs(configFile) confFileAbs, err := filepath.Abs(configFile)
if err != nil { if err != nil {
return err return err
...@@ -28,15 +29,16 @@ func Parse(configFile string, obj interface{}, reloads ...func()) error { ...@@ -28,15 +29,16 @@ func Parse(configFile string, obj interface{}, reloads ...func()) error {
viper.AddConfigPath(filePathStr) // path viper.AddConfigPath(filePathStr) // path
viper.SetConfigName(filename) // file name viper.SetConfigName(filename) // file name
viper.SetConfigType(ext) // get the configuration type from the file name viper.SetConfigType(ext) // get the configuration type from the file name
err = viper.ReadInConfig() if err = viper.ReadInConfig(); err != nil {
if err != nil {
return xerror.New(err.Error()) return xerror.New(err.Error())
} }
err = viper.Unmarshal(obj) obj.Lock()
if err != nil { if err = viper.Unmarshal(obj); err != nil {
obj.Unlock()
return xerror.New(err.Error()) return xerror.New(err.Error())
} }
obj.Unlock()
if len(reloads) > 0 { if len(reloads) > 0 {
watchConfig(obj, reloads...) watchConfig(obj, reloads...)
...@@ -45,27 +47,17 @@ func Parse(configFile string, obj interface{}, reloads ...func()) error { ...@@ -45,27 +47,17 @@ func Parse(configFile string, obj interface{}, reloads ...func()) error {
return nil return nil
} }
// ParseConfigData parse data to struct func watchConfig(obj ICfg, reloads ...func()) {
func ParseConfigData(data []byte, format string, obj interface{}) error {
viper.SetConfigType(format)
err := viper.ReadConfig(bytes.NewBuffer(data))
if err != nil {
return err
}
return viper.Unmarshal(obj)
}
// listening for profile updates
func watchConfig(obj interface{}, reloads ...func()) {
viper.WatchConfig() viper.WatchConfig()
// Note: OnConfigChange is called twice on Windows // Note: OnConfigChange is called twice on Windows
viper.OnConfigChange(func(e fsnotify.Event) { viper.OnConfigChange(func(e fsnotify.Event) {
err := viper.Unmarshal(obj) obj.Lock()
if err != nil { if err := viper.Unmarshal(obj); err != nil {
obj.Unlock()
fmt.Println("viper.Unmarshal error: ", err) fmt.Println("viper.Unmarshal error: ", err)
} else { } else {
obj.Unlock()
for _, reload := range reloads { for _, reload := range reloads {
reload() reload()
} }
...@@ -73,16 +65,21 @@ func watchConfig(obj interface{}, reloads ...func()) { ...@@ -73,16 +65,21 @@ func watchConfig(obj interface{}, reloads ...func()) {
}) })
} }
func Show(hiddenFields ...string) string {
return show(Cfg, hiddenFields...)
}
// Show print configuration information (hide sensitive fields) // Show print configuration information (hide sensitive fields)
func Show(obj interface{}, fields ...string) string { func show(obj ICfg, fields ...string) string {
var out string var out string
obj.RLock()
data, err := json.MarshalIndent(obj, "", " ") data, err := json.MarshalIndent(obj, "", " ")
if err != nil { if err != nil {
obj.RUnlock()
fmt.Println("json.MarshalIndent error: ", err) fmt.Println("json.MarshalIndent error: ", err)
return "" return ""
} }
obj.RUnlock()
buf := bufio.NewReader(bytes.NewReader(data)) buf := bufio.NewReader(bytes.NewReader(data))
for { for {
line, err := buf.ReadString('\n') line, err := buf.ReadString('\n')
...@@ -90,10 +87,8 @@ func Show(obj interface{}, fields ...string) string { ...@@ -90,10 +87,8 @@ func Show(obj interface{}, fields ...string) string {
break break
} }
fields = append(fields, `"dsn"`, `"password"`, `"pwd"`, `"signKey"`, `"access-key-id"`, `"access-key-secret"`) fields = append(fields, `"dsn"`, `"password"`, `"pwd"`, `"signKey"`, `"access-key-id"`, `"access-key-secret"`)
out += HideSensitiveFields(line, fields...) out += HideSensitiveFields(line, fields...)
} }
return out return out
} }
......
package conf package config
import ( import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo" "github.com/nacos-group/nacos-sdk-go/vo"
"gitlab.wanzhuangkj.com/tush/xpkg/onacos"
"go.uber.org/zap" "go.uber.org/zap"
) )
type NacosConfig struct {
IPs []string `json:"ips"`
Port int `json:"port"`
Namespace string `json:"namespace"`
ContextPath string `json:"context"`
DataID string `json:"dataId"`
Group string `json:"group"`
}
type NacosClient struct { type NacosClient struct {
cfg *NacosConfig cc *constant.ClientConfig
sc []constant.ServerConfig
cfg *onacos.NacosConfig
cli config_client.IConfigClient cli config_client.IConfigClient
content string content []byte
parseFn func(content string) (err error) parseFn func(content []byte) (err error)
e error e error
l *zap.Logger l *zap.Logger
} }
type Parser func(content string) (err error) type Parser func(content []byte) (err error)
func New(cfg *NacosConfig, p Parser, l *zap.Logger) *NacosClient { func NewNacos(cfg *onacos.NacosConfig, p Parser, l *zap.Logger) *NacosClient {
nc := NacosClient{} nc := NacosClient{}
nc.cfg = cfg nc.cfg = cfg
nc.parseFn = p nc.parseFn = p
...@@ -40,6 +36,14 @@ func (x *NacosClient) Err() error { ...@@ -40,6 +36,14 @@ func (x *NacosClient) Err() error {
return x.e return x.e
} }
func (x *NacosClient) GetCC() *constant.ClientConfig {
return x.cc
}
func (x *NacosClient) GetSC() []constant.ServerConfig {
return x.sc
}
func (x *NacosClient) GetCli() config_client.IConfigClient { func (x *NacosClient) GetCli() config_client.IConfigClient {
return x.cli return x.cli
} }
...@@ -48,12 +52,12 @@ func (x *NacosClient) Connect() *NacosClient { ...@@ -48,12 +52,12 @@ func (x *NacosClient) Connect() *NacosClient {
if x.e != nil { if x.e != nil {
return x return x
} }
clientConfig := *constant.NewClientConfig( x.cc = constant.NewClientConfig(
constant.WithNamespaceId(x.cfg.Namespace), // 命名空间ID constant.WithNamespaceId(x.cfg.Conf.Namespace), // 命名空间ID
constant.WithTimeoutMs(5000), // 请求超时时间 constant.WithTimeoutMs(5000), // 请求超时时间
constant.WithNotLoadCacheAtStart(false), // 启动时不读取缓存 constant.WithNotLoadCacheAtStart(false), // 启动时不读取缓存
constant.WithLogDir("/tmp/nacos/log"), // 日志目录 constant.WithLogDir("/tmp/nacos/log"), // 日志目录
constant.WithCacheDir("/tmp/nacos/cache"), // 缓存目录 constant.WithCacheDir("/tmp/nacos/cache"), // 缓存目录
// constant.WithUsername("nacos"), // 用户名 // constant.WithUsername("nacos"), // 用户名
// constant.WithPassword("nacos"), // 密码 // constant.WithPassword("nacos"), // 密码
) )
...@@ -62,11 +66,12 @@ func (x *NacosClient) Connect() *NacosClient { ...@@ -62,11 +66,12 @@ func (x *NacosClient) Connect() *NacosClient {
serverConfig := constant.NewServerConfig( serverConfig := constant.NewServerConfig(
ip, // Nacos服务地址 ip, // Nacos服务地址
uint64(x.cfg.Port), // Nacos服务端口 uint64(x.cfg.Port), // Nacos服务端口
constant.WithContextPath(x.cfg.ContextPath), constant.WithContextPath(x.cfg.Conf.ContextPath),
) )
serverConfigs = append(serverConfigs, *serverConfig) serverConfigs = append(serverConfigs, *serverConfig)
} }
if x.cli, x.e = clients.NewConfigClient(vo.NacosClientParam{ClientConfig: &clientConfig, ServerConfigs: serverConfigs}); x.e != nil { x.sc = serverConfigs
if x.cli, x.e = clients.NewConfigClient(vo.NacosClientParam{ClientConfig: x.cc, ServerConfigs: x.sc}); x.e != nil {
return x return x
} }
return x return x
...@@ -76,8 +81,11 @@ func (x *NacosClient) Fetch() *NacosClient { ...@@ -76,8 +81,11 @@ func (x *NacosClient) Fetch() *NacosClient {
if x.e != nil { if x.e != nil {
return x return x
} }
if x.content, x.e = x.cli.GetConfig(vo.ConfigParam{DataId: x.cfg.DataID, Group: x.cfg.Group}); x.e != nil { var content string
if content, x.e = x.cli.GetConfig(vo.ConfigParam{DataId: x.cfg.Conf.DataID, Group: x.cfg.Conf.Group}); x.e != nil {
return x return x
} else {
x.content = []byte(content)
} }
return x return x
} }
...@@ -86,7 +94,7 @@ func (x *NacosClient) Parse() *NacosClient { ...@@ -86,7 +94,7 @@ func (x *NacosClient) Parse() *NacosClient {
if x.e != nil { if x.e != nil {
return x return x
} }
x.e = x.parseFn(x.content) x.e = x.parseFn([]byte(x.content))
return x return x
} }
...@@ -95,9 +103,10 @@ func (x *NacosClient) Watch() *NacosClient { ...@@ -95,9 +103,10 @@ func (x *NacosClient) Watch() *NacosClient {
return x return x
} }
x.e = x.cli.ListenConfig(vo.ConfigParam{ x.e = x.cli.ListenConfig(vo.ConfigParam{
DataId: x.cfg.DataID, DataId: x.cfg.Conf.DataID,
Group: x.cfg.Group, Group: x.cfg.Conf.Group,
OnChange: func(namespace, group, dataId, data string) { OnChange: func(namespace, group, dataId, data string) {
fmt.Println("[nacos]conf changed")
_ = x.Parse() _ = x.Parse()
}, },
}) })
......
app:
name: "operator-admin"
env: "dev"
version: "v0.0.1"
host: "0.0.0.0"
enableStat: false
enableMetrics: true
enableHTTPProfile: true
enableLimit: false
enableCircuitBreaker: false
enableTrace: false
tracingSamplingRate: 1.0
registryDiscoveryType: "consul" # local nacos consul
cacheType: "memory" # memory redis
ipRateLimiter:
enable: true
window: 1m
maxRequests: 600
sync: true
oss:
endpoint: https://oss-cn-hangzhou.aliyuncs.com
access-key-id: LTAI5tD3VA4pi1pcz48yYgXL
access-key-secret: sus17wfcbjB6J9hDeJXXFtJf9LPN0S
bucket-name: test-wz-main
base-path: fota/upgrade
region-id: oss-cn-hangzhou
bucket-url: https://test-wz-main.oss-cn-hangzhou.aliyuncs.com
http:
port: 9001
timeout: 0
logger:
level: "debug"
format: "console"
isSave: true
logFileConfig:
filename: "/tmp/operator-admin.log"
maxSize: 100
maxBackups: 100000
maxAge: 360
isCompression: true
database:
- name: "wz_qitu"
driver: "mysql"
mysql:
dsn: "root:123456@tcp(localhost:30006)/wz_qitu?parseTime=true&loc=Local&charset=utf8,utf8mb4"
enableLog: true
maxIdleConns: 10
maxOpenConns: 100
connMaxLifetime: 30
redis:
dsn: "default:password123@localhost:30079/6"
dialTimeout: 10
readTimeout: 2
writeTimeout: 2
auth:
enable: true
expire: 6h
signKey: PEaWWIxffGpp8Dfl3z5SjE5WeeDStc1T
cron:
enable: true
...@@ -58,6 +58,7 @@ require ( ...@@ -58,6 +58,7 @@ require (
require ( require (
github.com/bwmarrin/snowflake v0.3.0 github.com/bwmarrin/snowflake v0.3.0
github.com/duke-git/lancet/v2 v2.3.4 github.com/duke-git/lancet/v2 v2.3.4
github.com/hashicorp/consul/api v1.12.0
github.com/mojocn/base64Captcha v1.3.8 github.com/mojocn/base64Captcha v1.3.8
github.com/nacos-group/nacos-sdk-go v1.1.6 github.com/nacos-group/nacos-sdk-go v1.1.6
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
...@@ -65,13 +66,23 @@ require ( ...@@ -65,13 +66,23 @@ require (
require ( require (
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704 // indirect github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704 // indirect
github.com/armon/go-metrics v0.3.10 // indirect
github.com/buger/jsonparser v1.1.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/go-errors/errors v1.0.1 // indirect github.com/go-errors/errors v1.0.1 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/mock v1.6.0 // indirect github.com/golang/mock v1.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect github.com/google/go-cmp v0.7.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/kr/pretty v0.3.1 // indirect github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
go.uber.org/goleak v1.3.0 // indirect go.uber.org/goleak v1.3.0 // indirect
golang.org/x/image v0.23.0 // indirect golang.org/x/image v0.23.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
......
差异被折叠。
package onacos
type NacosConfig struct {
IPs []string `yaml:"ips"`
Port int `yaml:"port"`
Conf NacosConfConfig `yaml:"conf"`
Rd NacosSvcConfig `yaml:"rd"`
}
type NacosConfConfig struct {
Namespace string `yaml:"namespace"`
ContextPath string `yaml:"context"`
DataID string `yaml:"dataId"`
Group string `yaml:"group"`
}
type NacosSvcConfig struct {
IP string `yaml:"ip"`
Port int `yaml:"port"`
ServiceName string `yaml:"serviceName"`
GroupName string `yaml:"groupName"`
ClusterName string `yaml:"clusterName"`
Weight int `yaml:"weight"`
Enable bool `yaml:"enable"`
Healthy bool `yaml:"healthy"`
Ephemeral bool `yaml:"ephemeral"`
}
package nacos
import (
"gitlab.wanzhuangkj.com/tush/xpkg/onacos"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
)
type NacosNamingClient struct {
cc *constant.ClientConfig
sc []constant.ServerConfig
cfg *onacos.NacosConfig
cli naming_client.INamingClient
}
func New(cfg *onacos.NacosConfig, cc *constant.ClientConfig, sc []constant.ServerConfig) (*NacosNamingClient, error) {
nnc := NacosNamingClient{
cc: cc,
sc: sc,
cfg: cfg,
}
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: nnc.cc,
ServerConfigs: sc,
},
)
if err != nil {
return nil, err
}
nnc.cli = namingClient
return &nnc, nil
}
func (x *NacosNamingClient) Register() error {
_, err := x.cli.RegisterInstance(vo.RegisterInstanceParam{
Ip: x.cfg.Rd.IP,
Port: uint64(x.cfg.Rd.Port),
ServiceName: x.cfg.Rd.ServiceName,
Weight: float64(x.cfg.Rd.Weight),
ClusterName: x.cfg.Rd.ClusterName,
GroupName: x.cfg.Rd.GroupName,
Enable: x.cfg.Rd.Enable,
Healthy: x.cfg.Rd.Healthy,
Ephemeral: x.cfg.Rd.Ephemeral,
})
return err
}
func (x *NacosNamingClient) Deregister() error {
_, err := x.cli.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: x.cfg.Rd.IP,
Port: uint64(x.cfg.Rd.Port),
Cluster: x.cfg.Rd.ClusterName,
ServiceName: x.cfg.Rd.ServiceName,
Ephemeral: x.cfg.Rd.Ephemeral,
GroupName: x.cfg.Rd.GroupName,
})
return err
}
func (x *NacosNamingClient) SelectInstances(groupName, serviceName string, healthOnly bool) ([]model.Instance, error) {
return x.cli.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
GroupName: groupName,
HealthyOnly: healthOnly,
})
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论