提交 6f112d71 authored 作者: mooncake9527's avatar mooncake9527

update

上级 e98cb7a0
......@@ -307,6 +307,7 @@ type Redis struct {
type Database struct {
Name string `yaml:"name"`
Driver string `yaml:"driver"`
Default bool `yaml:"default"`
Mongodb Mongodb `yaml:"mongodb"`
Mysql Mysql `yaml:"mysql"`
Postgresql Mysql `yaml:"postgresql"`
......
......@@ -26,6 +26,9 @@ func init() {
}
func parse() (err error) {
if hostname := os.Getenv("HOSTNAME"); hostname != "" {
Cfg.App.PodName = hostname
}
confType := loadEnvWithDefault("CONF_TYPE", "local")
logger.Infof("[conf]conf type:%s", confType)
Cfg.ConfType = confType
......
package consts
var (
DefaultSchema string
)
// Package database provides database client initialization.
package database
import (
"strings"
"sync"
"gitlab.wanzhuangkj.com/tush/xpkg/config"
"gitlab.wanzhuangkj.com/tush/xpkg/consts"
"gitlab.wanzhuangkj.com/tush/xpkg/sgorm"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
var (
gdbs = make(map[string]*sgorm.DB)
gdbOnce sync.Once
ErrRecordNotFound = sgorm.ErrRecordNotFound
)
// InitDB connect database
func InitDB() (err error) {
var databases []config.Database
config.Read(func(c *config.Config) {
databases = c.Database
})
for _, db := range databases {
if db.Default {
consts.DefaultSchema = db.Name
}
switch strings.ToLower(db.Driver) {
case sgorm.DBDriverMysql, sgorm.DBDriverTidb:
gdb, err := InitMysql(&db)
if err != nil {
return err
}
gdbs[db.Name] = gdb
default:
return xerror.New("InitDB error, please modify the correct 'database' configuration at yaml file. " +
"Refer to https://xmall/blob/main/configs/xmall.yml#L85")
}
}
return nil
}
type XDB struct {
Schema string
}
func (x XDB) DB() *sgorm.DB {
return DB(x.Schema)
}
func DB(name string) *sgorm.DB {
conn, ok := gdbs[name]
if !ok {
err := xerror.Newf("gdb[%s] not initialized", name)
panic(err)
}
return conn
}
func CloseDB() error {
var errs []error
for _, conn := range gdbs {
err := sgorm.CloseDB(conn)
if err != nil {
errs = append(errs, err)
}
}
return xerror.Join(errs...)
}
package database
import (
"time"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/config"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/sgorm"
"gitlab.wanzhuangkj.com/tush/xpkg/sgorm/mysql"
"gitlab.wanzhuangkj.com/tush/xpkg/utils"
)
// InitMysql connect mysql
func InitMysql(dbConfig *config.Database) (*sgorm.DB, error) {
enableTrace := false
config.Read(func(c *config.Config) {
enableTrace = c.App.EnableTrace
})
opts := []mysql.Option{
mysql.WithMaxIdleConns(dbConfig.Mysql.MaxIdleConns),
mysql.WithMaxOpenConns(dbConfig.Mysql.MaxOpenConns),
mysql.WithConnMaxLifetime(time.Duration(dbConfig.Mysql.ConnMaxLifetime) * time.Minute),
}
if dbConfig.Mysql.EnableLog {
opts = append(opts,
mysql.WithLogging(logger.Get()),
mysql.WithLogRequestIDKey(ctxUtils.ContextTraceIDKey),
)
}
if enableTrace {
opts = append(opts, mysql.WithEnableTrace())
}
// setting mysql slave and master dsn addresses
//opts = append(opts, mysql.WithRWSeparation(
// mysqlCfg.SlavesDsn,
// mysqlCfg.MastersDsn...,
//))
// add custom gorm plugin
//opts = append(opts, mysql.WithGormPlugin(yourPlugin))
dsn := utils.AdaptiveMysqlDsn(dbConfig.Mysql.Dsn)
db, err := mysql.Init(dsn, opts...)
if err != nil {
return nil, err
}
sensitiveFields := append([]string{}, `"dsn"`, `"username"`, `"password"`, `"pwd"`, `"signKey"`, `"access-key-id"`, `"access-key-secret"`)
insensitiveDsn := config.HideSensitiveFields(dbConfig.Mysql.Dsn, sensitiveFields...)
logger.Infof("database[%s] connected.", insensitiveDsn)
return db, nil
}
package database
import (
"sync"
"time"
"github.com/dgraph-io/ristretto"
"github.com/jinzhu/copier"
"gitlab.wanzhuangkj.com/tush/xpkg/config"
"gitlab.wanzhuangkj.com/tush/xpkg/goredis"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/tracer"
)
var (
// ErrCacheNotFound No hit cache
ErrCacheNotFound = goredis.ErrRedisNotFound
)
var (
redisCli *goredis.Client
redisCliOnce sync.Once
memoryCache *ristretto.Cache
cacheType string
)
func GetCacheType() string {
return cacheType
}
func GetMemoryCache() *ristretto.Cache {
return memoryCache
}
// InitCache initial cache
func InitCache(cType string) {
cacheType = cType
if cType == "redis" {
redisCli = GetRedisCli()
}
if cType == "memory" {
memoryCache = newMemoryCache()
}
}
func newMemoryCache() *ristretto.Cache {
config := &ristretto.Config{
NumCounters: 1e7, // 跟踪键访问频率的计数器数量(10M)
MaxCost: 1 << 30, // 最大内存成本(1GB)
BufferItems: 64, // 操作缓冲大小
}
cache, err := ristretto.NewCache(config)
if err != nil {
panic(err)
}
return cache
}
// InitRedis connect redis
func InitRedis() {
var redisCfg config.Redis
enableTrace := false
config.Read(func(c *config.Config) {
copier.Copy(&redisCfg, &c.Redis)
enableTrace = c.App.EnableTrace
})
opts := []goredis.Option{
goredis.WithLogger(logger.Get()),
goredis.WithDialTimeout(time.Duration(redisCfg.DialTimeout) * time.Second),
goredis.WithReadTimeout(time.Duration(redisCfg.ReadTimeout) * time.Second),
goredis.WithWriteTimeout(time.Duration(redisCfg.WriteTimeout) * time.Second),
}
if enableTrace {
opts = append(opts, goredis.WithTracing(tracer.GetProvider()))
}
var err error
redisCli, err = goredis.Init(redisCfg.Dsn, opts...)
if err != nil {
panic("init redis error: " + err.Error())
}
}
// GetRedisCli get redis client
func GetRedisCli() *goredis.Client {
if redisCli == nil {
redisCliOnce.Do(func() {
InitRedis()
})
}
return redisCli
}
// CloseRedis close redis
func CloseRedis() error {
return goredis.Close(redisCli)
}
package eventbus
var Eb = New()
const (
TopicDBInitFinish string = "event:application:database:init:finish" // 数据库连接初始化完成
TopicCacheInitFinish string = "event:application:cache:init:finish" // cache初始化完成
TopicCronInitFinish string = "event:application:cron:init:finish" // 定时任务初始化完成
TopicCoreInitFinish string = "event:application:core:init:finish" // 核心服务初始化完成
TopicApplicationClose string = "event:application:close" // 应用关闭
)
......@@ -8,7 +8,6 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/alicebob/miniredis/v2 v2.23.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6
github.com/dgraph-io/ristretto v0.1.1
github.com/felixge/fgprof v0.9.3
github.com/fsnotify/fsnotify v1.5.4
......@@ -57,7 +56,9 @@ require (
require (
github.com/bwmarrin/snowflake v0.3.0
github.com/dlclark/regexp2 v1.11.5
github.com/duke-git/lancet/v2 v2.3.4
github.com/go-redsync/redsync/v4 v4.13.0
github.com/hashicorp/consul/api v1.12.0
github.com/mojocn/base64Captcha v1.3.8
github.com/nacos-group/nacos-sdk-go v1.1.6
......@@ -73,9 +74,11 @@ require (
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
......@@ -112,8 +115,8 @@ require (
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
github.com/go-openapi/swag v0.19.15 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/glog v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
......
......@@ -65,8 +65,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704/go.mod h1:RcDobYh8k5VP6TNybz9m
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6 h1:G1bPvciwNyF7IUmKXNt9Ak3m6u9DE1rF+RmtIkBpVdA=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo=
github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
......@@ -120,6 +118,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ=
github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/duke-git/lancet/v2 v2.3.4 h1:8XGI7P9w+/GqmEBEXYaH/XuNiM0f4/90Ioti0IvYJls=
github.com/duke-git/lancet/v2 v2.3.4/go.mod h1:zGa2R4xswg6EG9I6WnyubDbFO/+A/RROxIbXcwryTsc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
......@@ -195,6 +195,14 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91
github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA=
github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
......@@ -243,6 +251,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
......@@ -292,8 +302,9 @@ github.com/hashicorp/consul/api v1.12.0 h1:k3y1FYv6nuKyNTqj6w9gXOx5r5CfLj/k/euUe
github.com/hashicorp/consul/api v1.12.0/go.mod h1:6pVBMo0ebnYdt2S3H87XhekM/HHrUoTD2XXb/VrZVy0=
github.com/hashicorp/consul/sdk v0.8.0 h1:OJtKBtEjboEZvG6AOUdh4Z1Zbyu0WcxQ0qatRrZHTVU=
github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
......@@ -307,8 +318,9 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
......@@ -502,6 +514,8 @@ github.com/redis/go-redis/extra/redisotel/v9 v9.7.0 h1:bQk8xiVFw+3ln4pfELVktpWgY
github.com/redis/go-redis/extra/redisotel/v9 v9.7.0/go.mod h1:0LyN+GHLIJmKtjYRPF7nHyTTMV6E91YngoOopNifQRo=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
......@@ -555,6 +569,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w=
......
package xpkg
import (
"context"
"fmt"
"strconv"
"github.com/jinzhu/copier"
"gitlab.wanzhuangkj.com/tush/xpkg/config"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"gitlab.wanzhuangkj.com/tush/xpkg/eventbus"
"gitlab.wanzhuangkj.com/tush/xpkg/ips"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/rd/nacos"
"gitlab.wanzhuangkj.com/tush/xpkg/stat"
"gitlab.wanzhuangkj.com/tush/xpkg/tracer"
)
func Init() error {
conf := config.Cfg
if err := initLogger(conf); err != nil {
return err
}
if conf.App.EnableTrace {
tracer.InitWithConfig(
conf.App.Name,
conf.App.Env,
conf.App.Version,
conf.Jaeger.AgentHost,
strconv.Itoa(conf.Jaeger.AgentPort),
conf.App.TracingSamplingRate,
)
logger.Info("[tracer] initialized")
}
if conf.App.EnableStat {
stat.Init(
stat.WithLog(logger.Get()), stat.WithAlarm(),
stat.WithPrintField(logger.String("service_name", conf.App.Name), logger.String("host", conf.App.Host)),
)
logger.Info("[resource statistics] initialized")
}
ctx := context.TODO()
if err := database.InitDB(); err != nil {
return err
}
eventbus.Eb.Publish(ctx, eventbus.TopicDBInitFinish)
logger.Info("[database] initialized")
database.InitCache(conf.App.CacheType)
eventbus.Eb.Publish(ctx, eventbus.TopicCacheInitFinish)
if conf.App.CacheType != "" {
logger.Info(fmt.Sprintf("[%s] initialized", conf.App.CacheType))
}
rdt := conf.App.RegistryDiscoveryType
if rdt == "nacos" {
localhost := ips.GetLocalHost()
config.Write(func(c *config.Config) {
c.Nacos.Rd.ServiceName = conf.App.Name
c.Nacos.Rd.IP = localhost
c.Nacos.Rd.Port = conf.HTTP.Port
})
config.Read(func(c *config.Config) {
copier.Copy(&conf, c)
})
cc := conf.NacosConfClient.GetCC()
sc := conf.NacosConfClient.GetSC()
nnc, err := nacos.New(&conf.Nacos, cc, sc)
if err != nil {
return err
}
if err := nnc.Register(); err != nil {
return err
}
config.Write(func(c *config.Config) {
c.NacosNamingClient = nnc
})
}
eventbus.Eb.Publish(ctx, eventbus.TopicCoreInitFinish)
return nil
}
func initLogger(cfg *config.Config) (err error) {
var loggerOpts []logger.Option
loggerOpts = append(loggerOpts, logger.WithLevel(cfg.Logger.Level), logger.WithFormat(cfg.Logger.Format))
if cfg.Logger.IsSave {
opt := logger.WithSave(
cfg.Logger.IsSave,
logger.WithFileName(cfg.Logger.LogFileConfig.Filename),
logger.WithFileMaxSize(cfg.Logger.LogFileConfig.MaxSize),
logger.WithFileMaxBackups(cfg.Logger.LogFileConfig.MaxBackups),
logger.WithFileMaxAge(cfg.Logger.LogFileConfig.MaxAge),
logger.WithFileIsCompression(cfg.Logger.LogFileConfig.IsCompression),
)
loggerOpts = append(loggerOpts, opt)
} else {
opt := logger.WithSave(cfg.Logger.IsSave)
loggerOpts = append(loggerOpts, opt)
}
_, err = logger.Init(loggerOpts...)
if err != nil {
return err
}
logger.Debug(config.Show())
logger.Info("[logger] initialized")
return nil
}
......@@ -120,6 +120,10 @@ func NewCtx(ctx context.Context) context.Context {
return context.WithValue(context.Background(), ContextTraceIDKey, ctx.Value(ContextTraceIDKey))
}
func GetGinUserName(c *gin.Context) string {
return getGinVal[string](c, KeyUName)
}
func GetGinUserID(c *gin.Context) xsf.ID {
return getGinVal[xsf.ID](c, KeyUID)
}
......
package jsonUtils
import (
"bytes"
"encoding/json"
"errors"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
func ToJsonString(v any) string {
......@@ -24,3 +27,52 @@ func Unmarshal(data []byte, v any) error {
}
return nil
}
func MarshalEscapeHTML(data interface{}) ([]byte, error) {
bf := bytes.NewBuffer([]byte{})
jsonEncoder := json.NewEncoder(bf)
jsonEncoder.SetEscapeHTML(false)
if err := jsonEncoder.Encode(data); err != nil {
return nil, xerror.Wrap(err, "json marshalEscapeHTML")
}
return bf.Bytes(), nil
}
func MarshalEscapeHTMLSilent(data interface{}) []byte {
d, _ := MarshalEscapeHTML(data)
return d
}
func Marshal(data any) ([]byte, error) {
if data == nil {
return nil, nil
}
d, err := json.Marshal(data)
if err != nil {
return nil, xerror.Wrap(err, "json marshal")
}
return d, nil
}
func MarshalSilent(data any) []byte {
d, _ := Marshal(data)
return d
}
func MarshalIndentString(data any) string {
d, _ := json.MarshalIndent(data, "", " ")
return string(d)
}
func ToJSONString(data interface{}) string {
if data == nil {
return ""
}
d, _ := json.Marshal(data)
return string(d)
}
func ToJSONStringPtr(data interface{}) *string {
v := ToJSONString(data)
return &v
}
package redSyncUtils
import (
"context"
"fmt"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
var (
redSync *redsync.Redsync
)
func Init(redisCli *redis.Client) {
pool := goredis.NewPool(redisCli)
redSync = redsync.New(pool)
}
func Sync(ctx context.Context, key string, fn func(), options ...redsync.Option) error {
if key == "" {
return xerror.New("[redSync]key is empty, please check")
}
if fn == nil {
return xerror.New("[redSync]fn is nil, please check")
}
if redSync == nil {
return xerror.New("[redSync]redSync is nil, please init first")
}
mutex := redSync.NewMutex(key, options...)
if err := mutex.LockContext(ctx); err != nil {
logger.Error("[redSync]try get lock fail", logger.Any("err", err), ctxUtils.CtxTraceIDField(ctx))
return xerror.Wrap(err, fmt.Sprintf("[redSync][%s]get lock fail", key))
}
defer func() {
if _, err := mutex.UnlockContext(ctx); err != nil {
logger.Error("[redSync]release lock fail", logger.Any("err", err), ctxUtils.CtxTraceIDField(ctx))
}
}()
fn()
return nil
}
package webLogUtils
import (
"container/list"
"context"
"fmt"
"strings"
"time"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/api"
"github.com/gin-gonic/gin"
"github.com/spf13/cast"
"gitlab.wanzhuangkj.com/tush/xpkg/httpcli/entity"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
merge "gitlab.wanzhuangkj.com/tush/xpkg/merger"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
pkgCtxUtils "gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/jsonUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/retryUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gorm.io/gorm"
glogger "gorm.io/gorm/logger"
)
func WebLogInterceptor() func(c *gin.Context) {
return func(c *gin.Context) {
var (
req *entity.CopyHttpReq
rsp *entity.CopyHttpRsp
respCode int
)
ctx := pkgCtxUtils.WrapCtx(c)
URL := strings.ToLower(c.Request.Method) + "#" + c.Request.URL.Path
xApi, ok := api.XApi.M[URL]
if ok {
uid := ctxUtils.GetGinUserID(c)
uname := ctxUtils.GetGinUserName(c)
uType := User_UserType_Enum(getUserType(ctx)).Desc()
req = entity.GetCopyReq(c)
if uid > 0 {
defer func() {
if respCode > 0 {
AddAsync(ctx, "", fmt.Sprintf("%s[%s]%s", uType, uname, xApi.Summary), nil, nil)
} else {
AddAsync(ctx, "", fmt.Sprintf("%s[%s]%s", uType, uname, xApi.Summary), req, rsp)
}
}()
} else {
defer func() {
if respCode > 0 {
AddAsync(ctx, "", xApi.Summary, nil, nil)
} else {
AddAsync(ctx, "", xApi.Summary, req, rsp)
}
}()
}
}
c.Next()
if ok {
respCode = c.GetInt(pkgCtxUtils.KeyRspCode)
if respCode <= 0 {
rsp = entity.GetCopyRsp(c)
}
}
}
}
func GetUserID(ctx context.Context) xsf.ID {
val := ctx.Value(ctxUtils.KeyUID)
if val != nil {
if id, ok := val.(xsf.ID); ok {
return id
}
if v, ok := val.(string); ok {
vid, _ := xsf.ParseString(v)
return vid
}
return xsf.ParseInt64(cast.ToInt64(val))
}
return xsf.ParseInt64(0)
}
var merger *merge.FanIn
func Init(schema string, isProd bool) {
duration := 10 * time.Minute
count := 256
if !isProd {
duration = 1 * time.Minute
count = 10
}
merger = merge.NewFanIn(duration, count, func(l *list.List) error {
if l.Len() == 0 {
return nil
}
ctx := context.WithValue(context.Background(), pkgCtxUtils.ContextTraceIDKey, xsf.GenerateID().String())
ors := make([]*WebLog, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
le, ok := e.Value.(*WebLog)
if ok {
ors = append(ors, le)
}
}
if len(ors) == 0 {
return nil
}
insertFn := func() error { return WebLogDao.CreateSliceSilent(ctx, ors) }
go func() {
if err := retryUtils.Retry(insertFn, retryUtils.RetryTimes(10), retryUtils.RetryWithLinearBackoff(32*time.Minute)); err != nil {
logger.Error("[webLog] insert fail", logger.String("err", err.Error()), pkgCtxUtils.CtxTraceIDField(ctx))
for _, or := range ors {
logger.Error("[webLog] insert fail detail", logger.String("user_id", or.UserID.String()), logger.String("operate", or.Operate), pkgCtxUtils.CtxTraceIDField(ctx))
}
}
}()
return nil
})
merger.Start()
WebLogDao.db = database.DB(schema)
}
func AddAsync(ctx context.Context, traceNo, operate string, req *entity.CopyHttpReq, rsp *entity.CopyHttpRsp) {
op := &WebLog{}
op.ID = xsf.GenerateID()
op.UserID = ctxUtils.GetCtxUserID(ctx)
op.TraceID = ctxUtils.GetCtxTid(ctx)
op.Operate = operate
op.CreatedAt = xtime.Now()
if req != nil {
op.Req = jsonUtils.ToJSONStringPtr(req)
}
if rsp != nil {
op.Rsp = jsonUtils.ToJSONStringPtr(rsp)
}
go func() {
merger.Add(op)
}()
}
type webLogDao struct {
db *gorm.DB
}
var WebLogDao = &webLogDao{}
func (x *webLogDao) CreateSliceSilent(ctx context.Context, ors []*WebLog) error {
return x.db.WithContext(ctx).Session(&gorm.Session{Logger: glogger.Discard}).Create(ors).Error
}
type User_UserType_Enum int8
var (
User_UserType_Operator User_UserType_Enum = 2 // 运营商
User_UserType_Share User_UserType_Enum = 3 // 合伙人
User_UserType_Agent User_UserType_Enum = 9 // 代理商
)
func (x User_UserType_Enum) Uint() uint {
return uint(x)
}
func (x User_UserType_Enum) Int() int {
return int(x)
}
func (x User_UserType_Enum) Desc() string {
desc := "未知"
switch x {
case User_UserType_Operator:
desc = "运营商"
case User_UserType_Share:
desc = "合伙人"
case User_UserType_Agent:
desc = "代理商"
}
return desc
}
func getUserType(ctx context.Context) int {
val := ctx.Value(pkgCtxUtils.KeyUType)
if val != nil {
return cast.ToInt(val)
}
return 0
}
type WebLog struct {
ID xsf.ID `json:"id" form:"id" swaggertype:"string" gorm:"column:id;type:bigint(20) unsigned;primaryKey;autoIncrement;comment:主键" example:"101"` //主键
UserID xsf.ID `json:"userID" form:"userID" swaggertype:"string" gorm:"column:user_id;type:bigint(20) unsigned;comment:用户ID" example:"102"` //用户ID
Operate string `json:"operate" form:"operate" gorm:"column:operate;type:varchar(128);comment:操作" example:"用户登出"` //操作
TraceID string `json:"traceID" form:"traceID" gorm:"column:trace_id;type:varchar(64);comment:溯源id" example:"TRACE_ID12345"` //溯源id
Req *string `json:"req" form:"req" gorm:"column:req;type:json;comment:请求" example:""` //请求
Rsp *string `json:"rsp" form:"rsp" gorm:"column:rsp;type:json;comment:响应" example:""` //响应
CreatedAt xtime.DateTime `json:"createdAt" form:"createdAt" gorm:"column:created_at;type:datetime;comment:创建时间" example:"2025-01-07 12:20:43"` //创建时间
}
func (x WebLog) GetID() xsf.ID {
return x.ID
}
const TBWebLog = "web_log"
func (WebLog) TableName() string {
return TBWebLog
}
func (x WebLog) GetOrder() string {
return "id asc"
}
func (x *WebLog) BeforeCreate(tx *gorm.DB) (err error) {
if x.CreatedAt.IsZero() {
x.CreatedAt = xtime.Now()
}
return nil
}
func (x *WebLog) BeforeUpdate(tx *gorm.DB) (err error) {
return nil
}
package api
import (
_ "embed"
"encoding/json"
"github.com/swaggo/swag"
)
func initApiM(xapi *swag.Spec) error {
swaggerJson := xapi.ReadDoc()
spec := &swagDocs{}
_ = json.Unmarshal([]byte(swaggerJson), spec)
var apis []*Api
for path, methods := range spec.Paths {
for method, details := range methods {
api := &Api{}
for key, val := range details {
api.URL = path
api.Method = method
switch key {
case "summary":
api.Summary, _ = val.(string)
case "description":
api.Description, _ = val.(string)
case "tags":
api.Tags, _ = val.([]string)
case "consumes":
api.Consumes, _ = val.([]string)
case "produces":
api.Produces, _ = val.([]string)
}
}
apis = append(apis, api)
}
}
if len(apis) == 0 {
return nil
}
for _, api := range apis {
XApi.M[api.Method+"#"+api.URL] = api
}
return nil
}
type Info struct {
Description string `json:"description"`
Title string `json:"title"`
Contact Contact `json:"contact"`
Version string `json:"version"`
}
type Contact struct {
Name string `json:"name"`
Url string `json:"url"`
Email string `json:"email"`
}
type swagDocs struct {
Schemes string `json:"schemes"`
Swagger string `json:"swagger"`
Info Info `json:"info"`
Host string `json:"host"`
BasePath string `json:"basePath"`
Paths map[string]map[string]map[string]any `json:"paths"`
SecurityDefinitions SecurityDefinitions `json:"securityDefinitions"`
}
type Api struct {
URL string `json:"url"`
Method string `json:"method"`
Summary string `json:"summary"`
Description string `json:"description"`
Tags []string `json:"tags"`
Consumes []string `json:"consumes"`
Produces []string `json:"produces"`
}
type SecurityDefinitions struct {
Bearer Bearer `json:"Bearer"`
}
type Bearer struct {
Type string `json:"type"`
Name string `json:"name"`
In string `json:"in"`
}
type apiMap struct {
M map[string]*Api // map[URL]*Api ,URL = method#uri
}
var (
XApi = apiMap{
M: make(map[string]*Api),
}
)
package api
import "github.com/swaggo/swag"
var xapi *swag.Spec
func Init(apiDoc *swag.Spec) (err error) {
xapi = apiDoc
return initApiM(xapi)
}
package base
import "strings"
type Condition interface {
SetWhere(k string, v []interface{})
SetOr(k string, v []interface{})
SetOrder(k string)
SetJoinOn(t, on string) Condition
}
type GormCondition struct {
GormPublic
Join []*GormJoin
}
type GormPublic struct {
Where map[string][]interface{}
Order []string
Or map[string][]interface{}
}
type GormJoin struct {
Type string
JoinOn string
GormPublic
}
func (e *GormJoin) SetJoinOn(t, on string) Condition {
return nil
}
func (e *GormPublic) SetWhere(k string, v []interface{}) {
if e.Where == nil {
e.Where = make(map[string][]interface{})
}
e.Where[k] = v
}
func (e *GormPublic) SetOr(k string, v []interface{}) {
if e.Or == nil {
e.Or = make(map[string][]interface{})
}
e.Or[k] = v
}
func (e *GormPublic) SetOrder(k string) {
if e.Order == nil {
e.Order = make([]string, 0)
}
e.Order = append(e.Order, k)
}
func (e *GormCondition) SetJoinOn(t, on string) Condition {
if e.Join == nil {
e.Join = make([]*GormJoin, 0)
}
join := &GormJoin{
Type: t,
JoinOn: on,
GormPublic: GormPublic{},
}
e.Join = append(e.Join, join)
return join
}
type resolveSearchTag struct {
Type string
Column string
Table string
On []string
Join string
}
// makeTag 解析search的tag标签
func makeTag(tag string) *resolveSearchTag {
r := &resolveSearchTag{}
tags := strings.Split(tag, ";")
var ts []string
for _, t := range tags {
ts = strings.Split(t, ":")
if len(ts) == 0 {
continue
}
switch ts[0] {
case "type":
if len(ts) > 1 {
r.Type = ts[1]
}
case "column":
if len(ts) > 1 {
r.Column = ts[1]
}
case "table":
if len(ts) > 1 {
r.Table = ts[1]
}
case "on":
if len(ts) > 1 {
r.On = ts[1:]
}
case "join":
if len(ts) > 1 {
r.Join = ts[1]
}
}
}
return r
}
差异被折叠。
差异被折叠。
package xcommon
import (
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xtype"
)
type IConverter[ID xtype.Key, T, B any] interface {
NewFromModel(a *T) *B
GetID() ID
}
type CommonBiz[ID xtype.Key, T any, B IConverter[ID, T, B]] struct {
}
func (x *CommonBiz[ID, T, B]) NewSliceFromModelsBiz(ms []*T) []*B {
if len(ms) == 0 {
return nil
}
ret := make([]*B, 0, len(ms))
var bt B
for _, r := range ms {
ret = append(ret, bt.NewFromModel(r))
}
return ret
}
func (x *CommonBiz[ID, T, B]) NewMapFromModelMapBiz(idMap map[xsf.ID]*T) map[xsf.ID]*B {
if len(idMap) == 0 {
return nil
}
ret := make(map[xsf.ID]*B, len(idMap))
var bt B
for k, v := range idMap {
ret[k] = bt.NewFromModel(v)
}
return ret
}
func (x *CommonBiz[ID, T, B]) NewMapSliceFromModelMapSliceBiz(idMapSlice map[xsf.ID][]*T) map[xsf.ID][]*B {
if len(idMapSlice) == 0 {
return nil
}
ret := make(map[xsf.ID][]*B, len(idMapSlice))
for k, v := range idMapSlice {
ret[k] = x.NewSliceFromModelsBiz(v)
}
return ret
}
func (x *CommonBiz[ID, T, B]) NewStrMapSliceFromModelMapSliceBiz(strMapSlice map[string][]*T) map[string][]*B {
if len(strMapSlice) == 0 {
return nil
}
ret := make(map[string][]*B, len(strMapSlice))
for k, v := range strMapSlice {
ret[k] = x.NewSliceFromModelsBiz(v)
}
return ret
}
func (x *CommonBiz[ID, T, B]) SliceToIDMapBiz(rs []*B) map[ID]*B {
if len(rs) == 0 {
return nil
}
ret := make(map[ID]*B, len(rs))
for _, r := range rs {
ret[(*r).GetID()] = r
}
return ret
}
func (x *CommonBiz[ID, T, B]) IDMapToSliceBiz(m map[xsf.ID]*B) []*B {
if len(m) == 0 {
return nil
}
ret := make([]*B, 0, len(m))
for _, v := range m {
ret = append(ret, v)
}
return ret
}
func (x *CommonBiz[ID, T, B]) IDMapSliceToSliceBiz(m map[xsf.ID][]*B) []*B {
if len(m) == 0 {
return nil
}
ret := make([]*B, 0)
for _, v := range m {
ret = append(ret, v...)
}
return ret
}
package xcommon
import (
"net/http"
"github.com/gin-gonic/gin"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/validator"
)
type IValid interface {
Valid() error
}
type Controller struct {
}
func NewController() *Controller {
return &Controller{}
}
func (e *Controller) Bind(c *gin.Context, req interface{}) error {
switch c.Request.Method {
case http.MethodGet, http.MethodDelete:
if err := c.ShouldBindQuery(req); err != nil {
if err.Error() == "EOF" {
err = nil
}
return err
}
default:
if err := c.ShouldBind(req); err != nil {
if err.Error() == "EOF" {
err = nil
}
return err
}
}
if err := validator.Validate(c, req); err != nil {
return err
}
if valid, ok := req.(IValid); ok {
if err := valid.Valid(); err != nil {
return err
}
}
c.Set("reqbody", req)
return nil
}
//
//func (x *Controller) GetUser(ctx context.Context) *biz.User {
// val := ctx.Value(ctxUtils.KeyUser)
// if val != nil {
// if u, ok := val.(*biz.User); ok {
// return u
// }
// }
// return nil
//}
package xcommon
import (
"github.com/swaggo/swag"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/api"
)
func Init(apiSpec *swag.Spec) (err error) {
if err := api.Init(apiSpec); err != nil {
return err
}
return nil
}
package ocache
import (
"bytes"
"context"
"strings"
"time"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"gitlab.wanzhuangkj.com/tush/xpkg/cache"
"gitlab.wanzhuangkj.com/tush/xpkg/encoding"
"gitlab.wanzhuangkj.com/tush/xpkg/utils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
const (
Prefix = "r:operator:"
DefaultExpireTime = 10 * time.Minute
)
var JsonEncoding = encoding.JSONEncoding{}
func getCacheKey(prefix string, id int64) string {
return prefix + utils.Int64ToStr(id)
}
type IID interface {
GetID() xsf.ID
TableName() string
}
type OCache[T IID] struct {
Cache cache.Cache
KeyPrefix string
ExpireTime time.Duration
}
func NewCache[T IID](cacheType string, prefix string, d time.Duration) *OCache[T] {
cachePrefix := ""
cType := strings.ToLower(cacheType)
if d <= 0 {
d = DefaultExpireTime
}
switch cType {
case cache.CacheType_Redis:
c := cache.NewRedisCache(database.GetRedisCli(), cachePrefix, JsonEncoding, func() interface{} {
var t T
return &t
})
return &OCache[T]{Cache: c, KeyPrefix: prefix, ExpireTime: d}
case cache.CacheType_Memory:
c := cache.NewMemoryCache(database.GetMemoryCache(), cachePrefix, JsonEncoding, func() interface{} {
var t T
return &t
})
return &OCache[T]{Cache: c, KeyPrefix: prefix, ExpireTime: d}
}
return nil
}
func (x *OCache[T]) Set(ctx context.Context, id xsf.ID, data *T, duration time.Duration) error {
if x.Cache == nil {
return nil
}
if data == nil || id == 0 {
return nil
}
cacheKey := getCacheKey(x.KeyPrefix, id.Int64())
err := x.Cache.Set(ctx, cacheKey, data, duration)
if err != nil {
return xerror.New(err.Error())
}
return nil
}
func (x *OCache[T]) Get(ctx context.Context, id xsf.ID) (*T, error) {
if x.Cache == nil {
return nil, nil
}
var data *T
cacheKey := getCacheKey(x.KeyPrefix, id.Int64())
err := x.Cache.Get(ctx, cacheKey, &data)
if err != nil {
return nil, xerror.New(err.Error())
}
return data, nil
}
func (x *OCache[T]) MultiSet(ctx context.Context, data []*T, duration time.Duration) error {
if x.Cache == nil {
return nil
}
valMap := make(map[string]interface{})
for _, v := range data {
cacheKey := getCacheKey(x.KeyPrefix, (*v).GetID().Int64())
valMap[cacheKey] = v
}
if err := x.Cache.MultiSet(ctx, valMap, duration); err != nil {
return xerror.New(err.Error())
}
return nil
}
func (x *OCache[T]) MultiGet(ctx context.Context, ids []xsf.ID) (map[xsf.ID]int, map[xsf.ID]*T, error) {
if x.Cache == nil {
return nil, nil, nil
}
if len(ids) == 0 {
return nil, nil, nil
}
var keys []string
for _, id := range ids {
cacheKey := x.KeyPrefix + utils.Int64ToStr(id.Int64())
keys = append(keys, cacheKey)
}
items, err := x.Cache.MGet(ctx, keys)
if err != nil {
return nil, nil, xerror.New(err.Error())
}
retMap := make(map[xsf.ID]*T)
hasMap := make(map[xsf.ID]int)
for i, id := range ids {
item := items[i]
hasMap[id] = 0
if dataBytes, ok := item.([]byte); ok {
if len(dataBytes) == 0 || bytes.Equal(dataBytes, cache.NotFoundPlaceholderBytes) {
hasMap[id] = 1
}
var record T
if err = JsonEncoding.Unmarshal(dataBytes, &record); err == nil {
hasMap[id] = 2
retMap[id] = &record
}
}
}
return hasMap, retMap, nil
}
func (x *OCache[T]) Del(ctx context.Context, id xsf.ID) error {
cacheKey := x.KeyPrefix + utils.Int64ToStr(id.Int64())
err := x.Cache.Del(ctx, cacheKey)
if err != nil {
return xerror.New(err.Error())
}
return nil
}
func (x *OCache[T]) Dels(ctx context.Context, ids []xsf.ID) error {
if len(ids) == 0 {
return nil
}
var keys []string
for _, id := range ids {
cacheKey := x.KeyPrefix + utils.Int64ToStr(id.Int64())
keys = append(keys, cacheKey)
}
err := x.Cache.Del(ctx, keys...)
if err != nil {
return xerror.New(err.Error())
}
return nil
}
func (x *OCache[T]) SetPlaceholder(ctx context.Context, id xsf.ID) error {
cacheKey := x.KeyPrefix + utils.Int64ToStr(id.Int64())
err := x.Cache.SetCacheWithNotFound(ctx, cacheKey)
if err != nil {
return xerror.New(err.Error())
}
return nil
}
func (x *OCache[T]) IsPlaceholderErr(err error) bool {
return xerror.Is(err, cache.ErrPlaceholder)
}
package odao
import "strings"
type Condition interface {
SetWhere(k string, v []interface{})
SetOr(k string, v []interface{})
SetOrder(k string)
SetJoinOn(t, on string) Condition
}
type GormCondition struct {
GormPublic
Join []*GormJoin
}
type GormPublic struct {
Where map[string][]interface{}
Order []string
Or map[string][]interface{}
}
type GormJoin struct {
Type string
JoinOn string
GormPublic
}
func (e *GormJoin) SetJoinOn(t, on string) Condition {
return nil
}
func (e *GormPublic) SetWhere(k string, v []interface{}) {
if e.Where == nil {
e.Where = make(map[string][]interface{})
}
e.Where[k] = v
}
func (e *GormPublic) SetOr(k string, v []interface{}) {
if e.Or == nil {
e.Or = make(map[string][]interface{})
}
e.Or[k] = v
}
func (e *GormPublic) SetOrder(k string) {
if e.Order == nil {
e.Order = make([]string, 0)
}
e.Order = append(e.Order, k)
}
func (e *GormCondition) SetJoinOn(t, on string) Condition {
if e.Join == nil {
e.Join = make([]*GormJoin, 0)
}
join := &GormJoin{
Type: t,
JoinOn: on,
GormPublic: GormPublic{},
}
e.Join = append(e.Join, join)
return join
}
type resolveSearchTag struct {
Type string
Column string
Table string
On []string
Join string
}
// makeTag 解析search的tag标签
func makeTag(tag string) *resolveSearchTag {
r := &resolveSearchTag{}
tags := strings.Split(tag, ";")
var ts []string
for _, t := range tags {
ts = strings.Split(t, ":")
if len(ts) == 0 {
continue
}
switch ts[0] {
case "type":
if len(ts) > 1 {
r.Type = ts[1]
}
case "column":
if len(ts) > 1 {
r.Column = ts[1]
}
case "table":
if len(ts) > 1 {
r.Table = ts[1]
}
case "on":
if len(ts) > 1 {
r.On = ts[1:]
}
case "join":
if len(ts) > 1 {
r.Join = ts[1]
}
}
}
return r
}
差异被折叠。
差异被折叠。
package xcommon
import (
"bytes"
"context"
"encoding/csv"
)
type Service struct{}
func NewService() *Service {
return &Service{}
}
func (x *Service) PageQuery(pageSize int, cb func(pageSize, offset int, size *int) error) error {
pageIndex := 1
size := 0
if pageSize == 0 {
pageSize = 1000
}
for {
offset := (pageIndex - 1) * pageSize
if err := cb(pageSize, offset, &size); err != nil {
return err
}
if size == 0 || size < pageSize {
break
}
pageIndex++
}
return nil
}
func (x *Service) NewCsvFile(ctx context.Context, buff *bytes.Buffer, comma ...rune) *csv.Writer {
buff.WriteString("\xEF\xBB\xBF")
writer := csv.NewWriter(buff)
if len(comma) > 0 {
writer.Comma = comma[0]
}
return writer
}
package validator
import (
"mime/multipart"
"reflect"
"strings"
"github.com/dlclark/regexp2"
"github.com/gin-gonic/gin"
"github.com/go-playground/locales/en"
"github.com/go-playground/locales/zh"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
en_translations "github.com/go-playground/validator/v10/translations/en"
zh_translations "github.com/go-playground/validator/v10/translations/zh"
)
// Translation 设置Translation
func Translation() gin.HandlerFunc {
en := en.New()
zh := zh.New()
universalTranslator := ut.New(zh, zh, en)
valid := validator.New()
_ = valid.RegisterValidation("mobile", func(fl validator.FieldLevel) bool {
regex := regexp2.MustCompile(`^1[3456789]\d{9}$`, 0)
phone := fl.Field().String()
matched, err := regex.MatchString(phone)
if err != nil {
return false
}
return matched
})
//自定义电话验证方法
_ = valid.RegisterValidation("phone", func(fl validator.FieldLevel) bool {
regex := regexp2.MustCompile(`^(\d{3,4}-)?\d{6,11}$`, 0)
phone := fl.Field().String()
matched, err := regex.MatchString(phone)
if err != nil {
return false
}
return matched
})
//自定义图片验证方法
_ = valid.RegisterValidation("image", func(fl validator.FieldLevel) bool {
file := fl.Field().Interface()
if fileHeader, ok := file.(multipart.FileHeader); ok {
contentType := fileHeader.Header.Get("content-type")
for _, t := range []string{"image/jpeg", "image/png", "image/gif"} {
if strings.HasPrefix(contentType, t) {
return true
}
}
}
return false
})
return func(c *gin.Context) {
//根据参数取翻译器实例
locale := c.DefaultQuery("locale", "zh")
trans, _ := universalTranslator.GetTranslator(locale)
switch locale {
case "en":
_ = en_translations.RegisterDefaultTranslations(valid, trans)
valid.RegisterTagNameFunc(func(fld reflect.StructField) string {
return fld.Tag.Get("encomment")
})
default:
_ = zh_translations.RegisterDefaultTranslations(valid, trans)
valid.RegisterTagNameFunc(func(fld reflect.StructField) string {
return fld.Tag.Get("comment")
})
_ = valid.RegisterTranslation("mobile", trans, func(ut ut.Translator) error {
return ut.Add("mobile", "{0}填写不正确", true)
}, func(ut ut.Translator, fe validator.FieldError) string {
t, _ := ut.T("mobile", fe.Field())
return t
})
_ = valid.RegisterTranslation("phone", trans, func(ut ut.Translator) error {
return ut.Add("phone", "{0}填写不正确", true)
}, func(ut ut.Translator, fe validator.FieldError) string {
t, _ := ut.T("phone", fe.Field())
return t
})
_ = valid.RegisterTranslation("image", trans, func(ut ut.Translator) error {
return ut.Add("image", "请上传{0}", true)
}, func(ut ut.Translator, fe validator.FieldError) string {
t, _ := ut.T("image", fe.Field())
return t
})
}
c.Set(CtxKey_Translator, trans)
c.Set(CtxKey_Validator, valid)
c.Next()
}
}
package validator
import (
"errors"
"fmt"
"github.com/gin-gonic/gin"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
"gitlab.wanzhuangkj.com/tush/xpkg/errcode"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
var (
InvalidParams = errcode.InvalidParams
)
const (
CtxKey_Validator = "validator"
CtxKey_Translator = "translator"
)
func Validate(c *gin.Context, params any) error {
valid, _ := getValidator(c)
if valid != nil {
trans, _ := getTranslation(c)
if err := valid.Struct(params); err != nil {
if errs, ok := err.(validator.ValidationErrors); ok {
for _, e := range errs {
if trans != nil {
return xerror.NewC(InvalidParams.Code(), fmt.Sprintf("参数错误:%s", e.Translate(trans)))
} else {
return xerror.NewC(InvalidParams.Code(), fmt.Sprintf("参数错误:%s", e.Error()))
}
}
} else {
return err
}
}
}
return nil
}
func getValidator(c *gin.Context) (*validator.Validate, error) {
val, ok := c.Get(CtxKey_Validator)
if !ok {
return nil, errors.New("未设置验证器")
}
vd, ok := val.(*validator.Validate)
if !ok {
return nil, errors.New("获取验证器失败")
}
return vd, nil
}
func getTranslation(c *gin.Context) (ut.Translator, error) {
trans, ok := c.Get(CtxKey_Translator)
if !ok {
return nil, errors.New("未设置翻译器")
}
translator, ok := trans.(ut.Translator)
if !ok {
return nil, errors.New("获取翻译器失败")
}
return translator, nil
}
package xmodels
import (
"gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/setUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gorm.io/gorm"
)
func GetUid(tx *gorm.DB) xsf.ID {
ctx := tx.Statement.Context
return ctxUtils.GetCtxUserID(ctx)
}
type IModel interface {
GetID() xsf.ID
}
func GetIDs[T IModel](rs []*T) []xsf.ID {
st := setUtils.NewSet[xsf.ID]()
for _, v := range rs {
st.Add((*v).GetID())
}
return st.Slice()
}
func StrMapToSlice[T any](m map[string]*T) []*T {
var rs []*T
for _, r := range m {
rs = append(rs, r)
}
return rs
}
func GetStrMapIDs[T IModel](m map[string]*T) []xsf.ID {
st := setUtils.NewSet[xsf.ID]()
for _, v := range m {
st.Add((*v).GetID())
}
return st.Slice()
}
func IDMapToSlice[T any](m map[xsf.ID]*T) []*T {
var rs []*T
for _, r := range m {
rs = append(rs, r)
}
return rs
}
func GetIDMapIDs[T IModel](m map[xsf.ID]*T) []xsf.ID {
st := setUtils.NewSet[xsf.ID]()
for _, v := range m {
st.Add((*v).GetID())
}
return st.Slice()
}
func SliceToIDMap[T IModel](rs []*T) map[xsf.ID]*T {
m := make(map[xsf.ID]*T)
for _, r := range rs {
m[(*r).GetID()] = r
}
return m
}
func IDMapSliceToSlice[T any](m map[xsf.ID][]*T) []*T {
var rs []*T
for _, r := range m {
rs = append(rs, r...)
}
return rs
}
func GetIDMapSliceIDs[T IModel](m map[xsf.ID][]*T) []xsf.ID {
st := setUtils.NewSet[xsf.ID]()
for _, rs := range m {
for _, r := range rs {
st.Add((*r).GetID())
}
}
return st.Slice()
}
package biz
import (
"context"
"encoding/json"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/sliceUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon"
)
type CronJob struct {
models.CronJob
xcommon.CommonBiz[xsf.ID, models.CronJob, CronJob]
CronJobInfo *models.CronJobInfo
}
type cronJobInfo struct {
Before string `json:"before"`
}
func (x *CronJob) ParseJobInfo() {
if x.Info != "" {
cji := &cronJobInfo{}
json.Unmarshal([]byte(x.Info), cji)
d, _ := xtime.ParseExtendedDuration(cji.Before)
x.CronJobInfo = &models.CronJobInfo{
Before: d,
}
}
}
var CronJobTool = &CronJob{}
func (*CronJob) GetIDs(rs []*CronJob) []xsf.ID {
return sliceUtils.GetIDs(rs)
}
type CronJobOpts struct {
}
func (CronJob) NewFromModel(m *models.CronJob) *CronJob {
if m == nil {
return nil
}
return &CronJob{
CronJob: *m,
}
}
func (x *CronJob) ToModel() *models.CronJob {
if x == nil {
return nil
}
return &x.CronJob
}
func (x CronJob) Valid(ctx context.Context, cronJob *CronJob, opts *CronJobOpts) error {
if cronJob == nil {
return nil
}
if opts == nil {
opts = &CronJobOpts{}
}
return nil
}
func (x CronJob) ValidSlice(ctx context.Context, cronJobBizSlice []*CronJob, opts *CronJobOpts) error {
if len(cronJobBizSlice) == 0 {
return nil
}
if opts == nil {
opts = &CronJobOpts{}
}
return nil
}
package biz
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/setUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/sliceUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
)
type CronJobLog struct {
models.CronJobLog
xcommon.CommonBiz[xsf.ID, models.CronJobLog, CronJobLog]
}
var CronJobLogTool = &CronJobLog{}
func (*CronJobLog) GetIDs(rs []*CronJobLog) []xsf.ID {
return sliceUtils.GetIDs(rs)
}
func (x *CronJobLog) GetJobIDs(rs []*CronJobLog) []xsf.ID {
set := setUtils.NewSet[xsf.ID]()
for _, r := range rs {
set.Add(r.GetJobID())
}
return set.Slice()
}
type CronJobLogOpts struct {
}
func (CronJobLog) NewFromModel(m *models.CronJobLog) *CronJobLog {
if m == nil {
return nil
}
return &CronJobLog{
CronJobLog: *m,
}
}
func (x *CronJobLog) ToModel() *models.CronJobLog {
if x == nil {
return nil
}
return &x.CronJobLog
}
func (x CronJobLog) Valid(ctx context.Context, cronJobLog *CronJobLog, opts *CronJobLogOpts) error {
if cronJobLog == nil {
return nil
}
if opts == nil {
opts = &CronJobLogOpts{}
}
return nil
}
func (x CronJobLog) ValidSlice(ctx context.Context, cronJobLogBizSlice []*CronJobLog, opts *CronJobLogOpts) error {
if len(cronJobLogBizSlice) == 0 {
return nil
}
if opts == nil {
opts = &CronJobLogOpts{}
}
return nil
}
package biz
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/setUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/sliceUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon"
)
type CronJobRecord struct {
models.CronJobRecord
xcommon.CommonBiz[xsf.ID, models.CronJobRecord, CronJobRecord]
}
var CronJobRecordTool = &CronJobRecord{}
func (*CronJobRecord) GetIDs(rs []*CronJobRecord) []xsf.ID {
return sliceUtils.GetIDs(rs)
}
func (x *CronJobRecord) GetJobIDs(rs []*CronJobRecord) []xsf.ID {
set := setUtils.NewSet[xsf.ID]()
for _, r := range rs {
set.Add(r.GetJobID())
}
return set.Slice()
}
type CronJobRecordOpts struct {
}
func (CronJobRecord) NewFromModel(m *models.CronJobRecord) *CronJobRecord {
if m == nil {
return nil
}
return &CronJobRecord{
CronJobRecord: *m,
}
}
func (x *CronJobRecord) ToModel() *models.CronJobRecord {
if x == nil {
return nil
}
return &x.CronJobRecord
}
func (x CronJobRecord) Valid(ctx context.Context, cronJobRecord *CronJobRecord, opts *CronJobRecordOpts) error {
if cronJobRecord == nil {
return nil
}
if opts == nil {
opts = &CronJobRecordOpts{}
}
return nil
}
func (x CronJobRecord) ValidSlice(ctx context.Context, cronJobRecordBizSlice []*CronJobRecord, opts *CronJobRecordOpts) error {
if len(cronJobRecordBizSlice) == 0 {
return nil
}
if opts == nil {
opts = &CronJobRecordOpts{}
}
return nil
}
package cache
import (
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/ocache"
)
const (
cronJobCachePrefixKey = ocache.Prefix + "cronJob:"
)
type CronJobCache ocache.OCache[models.CronJob]
func NewCronJobCache(cacheType string) *ocache.OCache[models.CronJob] {
return ocache.NewCache[models.CronJob](cacheType, cronJobCachePrefixKey, ocache.DefaultExpireTime)
}
package xcron
import (
"context"
"errors"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/go-redsync/redsync/v4"
"gitlab.wanzhuangkj.com/tush/xpkg/config"
"gitlab.wanzhuangkj.com/tush/xpkg/eventbus"
"gitlab.wanzhuangkj.com/tush/xpkg/gin/response"
"gitlab.wanzhuangkj.com/tush/xpkg/gocron"
"gitlab.wanzhuangkj.com/tush/xpkg/logger"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/ctxUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/redSyncUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/sliceUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/biz"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/dao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/enums"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/service"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/types"
)
const (
PWD = "1937396767217688576"
)
type cronJobController struct{}
var Instance cronJobController
func (x *cronJobController) StartByCode(c *gin.Context) {
req := &types.CronJobCodeReq{}
if err := c.ShouldBind(req); err != nil {
response.Error(c, err)
return
}
if req.Pwd != PWD {
response.Error(c, errors.New("pwd err"))
return
}
ctx := ctxUtils.WrapCtx(c)
job, err := service.CronJobService.GetByJobCode(ctx, req.JobCode, &biz.CronJobOpts{})
if err != nil {
response.Error(c, err)
return
}
if err := service.CronJobService.StartByID(ctx, job.ID, nil); err != nil {
response.Error(c, err)
return
}
gocron.Add(&gocron.Task{
TimeSpec: job.TimeSpec,
Name: job.JobCode,
Fn: func() {
ExecJob(context.TODO(), job)
},
})
response.Success(c)
}
func (x *cronJobController) StopByCode(c *gin.Context) {
req := &types.CronJobCodeReq{}
if err := c.ShouldBind(req); err != nil {
response.Error(c, err)
return
}
if req.Pwd != PWD {
response.Error(c, errors.New("pwd err"))
return
}
ctx := ctxUtils.WrapCtx(c)
job, err := service.CronJobService.GetByJobCode(ctx, req.JobCode, &biz.CronJobOpts{})
if err != nil {
response.Error(c, err)
return
}
if err := service.CronJobService.StopByID(ctx, job.ID, nil); err != nil {
response.Error(c, err)
return
}
gocron.DeleteTask(job.JobCode)
response.Success(c)
}
func (x *cronJobController) CallOnceByCode(c *gin.Context) {
req := &types.CronJobCodeReq{}
if err := c.ShouldBind(req); err != nil {
response.Error(c, err)
return
}
if req.Pwd != PWD {
response.Error(c, errors.New("pwd err"))
return
}
job, err := service.CronJobService.GetByJobCode(ctxUtils.WrapCtx(c), req.JobCode, &biz.CronJobOpts{})
if err != nil {
response.Error(c, err)
return
}
err = ExecJob(ctxUtils.WrapCtx(c), job)
if err != nil {
response.Error(c, err)
return
}
response.Success(c)
}
func Init(handlers map[string]Fn) error {
ctx := context.TODO()
TaskRegisterMaps = handlers
if !config.CronOpen() {
return nil
}
cronJobs, err := service.CronJobService.GetSliceByEnable(ctx, enums.CronJob_Enable_OPEN, &biz.CronJobOpts{})
if err != nil {
return err
}
if len(cronJobs) == 0 {
return nil
}
cronJobIDs := sliceUtils.GetIDs(cronJobs)
_ = service.CronJobService.UpdateByIDs(ctx, &types.CronJobUpdateByIDsReq{IDs: cronJobIDs, State: enums.CronJob_State_NOT_RUNNING}, nil)
var tasks []*gocron.Task
for _, job := range cronJobs {
job.ParseJobInfo()
_, ok := handlers[job.JobCode]
if ok {
tsk := gocron.Task{
TimeSpec: job.TimeSpec,
Name: job.JobCode,
Fn: func() {
ctx := context.WithValue(ctx, ctxUtils.HeaderXRequestIDKey, ctxUtils.GenerateTid())
if err := ExecJob(ctx, job); err != nil {
logger.Error(fmt.Sprintf("[cron]exec err: %s", err.Error()), logger.Any("name", job.JobName), logger.Any("timeSpec", job.TimeSpec), logger.Any("args", job.Info), ctxUtils.CtxTraceIDField(ctx))
}
},
}
logger.Info("[cron]add cron job", logger.Any("name", job.JobName), logger.Any("timeSpec", job.TimeSpec), logger.Any("args", job.Info), ctxUtils.CtxTraceIDField(ctx))
tasks = append(tasks, &tsk)
}
}
if len(tasks) == 0 {
return nil
}
if err = gocron.Init(gocron.WithLog(logger.Get(), false), gocron.WithGranularity(gocron.SecondType)); err != nil {
return err
}
if err = gocron.Add(tasks...); err != nil {
return err
}
eventbus.Eb.Publish(ctx, eventbus.TopicCronInitFinish)
logger.Info("[cron] initialized", ctxUtils.CtxTraceIDField(ctx))
return nil
}
type Fn func(ctx context.Context, job *biz.CronJob) error
var TaskRegisterMaps = map[string]Fn{}
func ExecJob(ctx context.Context, j *biz.CronJob) error {
if j == nil {
return errors.New("job is nil")
}
_, ok := TaskRegisterMaps[j.JobCode]
if !ok {
return errors.New("handler not found")
}
return redSyncUtils.Sync(ctx, j.JobCode, func() {
if err := execTask(ctx, j); err != nil {
logger.Error(fmt.Sprintf("[cronJob][%s] exec err: %s", j.JobName, err.Error()), ctxUtils.CtxTraceIDField(ctx))
}
},
redsync.WithExpiry(5*time.Minute),
redsync.WithTries(10),
redsync.WithRetryDelayFunc(func(tries int) time.Duration {
return time.Duration(100+tries*20) * time.Millisecond
}),
redsync.WithDriftFactor(0.01),
redsync.WithTimeoutFactor(0.05))
}
func execTask(ctx context.Context, j *biz.CronJob) error {
if j == nil {
return errors.New("job not found")
}
fn, ok := TaskRegisterMaps[j.JobCode]
if !ok {
return errors.New("handler not found")
}
startTime := time.Now()
job, err := service.CronJobService.GetNotRunning(ctx, j.ID)
if err != nil {
return err
}
if job == nil {
return nil
}
logger.Info(fmt.Sprintf("[cronJob][%s] start", j.JobName), ctxUtils.CtxTraceIDField(ctx))
cronJobRecord := &models.CronJobRecord{JobID: job.ID, BeginTime: xtime.Now()}
_ = dao.CronJobRecordDao.CreateSilent(ctx, cronJobRecord)
_ = dao.CronJobLogDao.CreateSilent(ctx, &models.CronJobLog{JobID: job.ID, Info: "job started."})
defer func() {
logger.Info(fmt.Sprintf("[cronJob][%s] end", j.JobName), logger.Any("cost", time.Since(startTime).String()), ctxUtils.CtxTraceIDField(ctx))
_ = dao.CronJobDao.UpdateByIDSilent(ctx, job.ID, &models.CronJob{State: enums.CronJob_State_NOT_RUNNING})
}()
_ = dao.CronJobDao.UpdateByIDSilent(ctx, job.ID, &models.CronJob{State: enums.CronJob_State_RUNNING})
if err = fn(ctx, j); err != nil {
_ = dao.CronJobLogDao.CreateSilent(ctx, &models.CronJobLog{JobID: job.ID, Info: "fail. err:" + err.Error()})
_ = dao.CronJobRecordDao.UpdateByIDSilent(ctx, cronJobRecord.ID, &models.CronJobRecord{EndTime: xtime.Now()})
return err
}
_ = dao.CronJobLogDao.CreateSilent(ctx, &models.CronJobLog{JobID: job.ID, Info: "success."})
_ = dao.CronJobRecordDao.UpdateByIDSilent(ctx, cronJobRecord.ID, &models.CronJobRecord{EndTime: xtime.Now()})
return nil
}
CREATE TABLE `cron_job` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID eg[101]',
`job_name` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '定时任务名称 eg[任务-自动确认订单]',
`job_code` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '定时任务code eg[autoConfirmOrdersTask] mapping[1] [unique]',
`time_spec` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '定时任务时间表达式 eg[0 0 0 * * ?]',
`state` tinyint(1) NOT NULL DEFAULT '1' COMMENT '任务状态 enums[1.RUNNING.运行中 2.NOT_RUNNING.未运行] eg[1]',
`enable` tinyint(1) NOT NULL DEFAULT '1' COMMENT '任务开关 enums[1.CLOSE.关闭 2.OPEN.开启] eg[1]',
`info` text COLLATE utf8mb4_unicode_ci COMMENT '额外信息 eg[]',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间 eg[2025-01-07 12:20:43]',
`updated_at` datetime DEFAULT NULL COMMENT '修改时间 eg[2025-01-07 12:20:43]',
PRIMARY KEY (`id`),
UNIQUE KEY `unq_job_code` (`job_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='定时任务';
CREATE TABLE `cron_job_log` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID eg[101]',
`job_id` bigint(20) unsigned NOT NULL COMMENT 'job ID eg[101]',
`info` text COLLATE utf8mb4_unicode_ci COMMENT '日志 eg[job occur error...]',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间 eg[2025-01-07 12:20:43]',
PRIMARY KEY (`id`),
KEY `idx_jobid_time` (`job_id`,`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='定时任务日志';
CREATE TABLE `cron_job_record` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID eg[101]',
`job_id` bigint(20) unsigned NOT NULL COMMENT 'job ID eg[101]',
`begin_time` datetime DEFAULT NULL COMMENT '开始时间 eg[2025-01-07 12:20:43]',
`end_time` datetime DEFAULT NULL COMMENT '结束时间 eg[2025-01-07 12:20:43]',
`info` text COLLATE utf8mb4_unicode_ci COMMENT '额外信息 eg[]',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间 eg[2025-01-07 12:20:43]',
`updated_at` datetime DEFAULT NULL COMMENT '修改时间 eg[2025-01-07 12:20:43]',
PRIMARY KEY (`id`),
KEY `idx_jobid_time` (`job_id`,`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='定时任务运行记录';
package dao
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/consts"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/odao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/cache"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/enums"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/eventbus"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
"golang.org/x/sync/singleflight"
"gorm.io/gorm"
glogger "gorm.io/gorm/logger"
)
type cronJobDao struct {
*odao.ODao[models.CronJob]
}
var CronJobDao = &cronJobDao{
ODao: &odao.ODao[models.CronJob]{
XDB: &database.XDB{
Schema: consts.DefaultSchema,
},
Sf: new(singleflight.Group),
},
}
func init() {
_ = eventbus.Eb.Subscribe(eventbus.TopicCacheInitFinish, func(ctx context.Context) {
CronJobDao.ODao.Cache = cache.NewCronJobCache(database.GetCacheType())
})
}
func (x *cronJobDao) GetByJobCode(ctx context.Context, jobCode string) (*models.CronJob, error) {
if jobCode == "" {
return nil, nil
}
return x.GetByWhere(ctx, x.DB(), "job_code = ?", jobCode)
}
func (x *cronJobDao) GetSliceByState(ctx context.Context, order string, state enums.CronJob_State_Enum) ([]*models.CronJob, error) {
return x.GetOrderSliceByWhere(ctx, order, "state = ?", state)
}
func (x *cronJobDao) GetSliceByStates(ctx context.Context, order string, states []enums.CronJob_State_Enum) ([]*models.CronJob, error) {
if len(states) == 0 {
return nil, nil
}
return x.GetOrderSliceByWhere(ctx, order, "state IN (?)", states)
}
func (x *cronJobDao) GetMapSliceByStates(ctx context.Context, order string, states []enums.CronJob_State_Enum) (map[enums.CronJob_State_Enum][]*models.CronJob, int, error) {
if len(states) == 0 {
return nil, 0, nil
}
cronJobs, err := x.GetOrderSliceByWhere(ctx, order, "state IN (?)", states)
if err != nil {
return nil, 0, err
}
if len(cronJobs) == 0 {
return nil, 0, nil
}
cronJobMapSlice := make(map[enums.CronJob_State_Enum][]*models.CronJob)
for i := range cronJobs {
cronJobMapSlice[cronJobs[i].State] = append(cronJobMapSlice[cronJobs[i].State], cronJobs[i])
}
return cronJobMapSlice, len(cronJobs), nil
}
func (x *cronJobDao) GetSliceByEnable(ctx context.Context, order string, enable enums.CronJob_Enable_Enum) ([]*models.CronJob, error) {
return x.GetOrderSliceByWhere(ctx, order, "enable = ?", enable)
}
func (x *cronJobDao) GetSliceByEnables(ctx context.Context, order string, enables []enums.CronJob_Enable_Enum) ([]*models.CronJob, error) {
if len(enables) == 0 {
return nil, nil
}
return x.GetOrderSliceByWhere(ctx, order, "enable IN (?)", enables)
}
func (x *cronJobDao) GetMapSliceByEnables(ctx context.Context, order string, enables []enums.CronJob_Enable_Enum) (map[enums.CronJob_Enable_Enum][]*models.CronJob, int, error) {
if len(enables) == 0 {
return nil, 0, nil
}
cronJobs, err := x.GetOrderSliceByWhere(ctx, order, "enable IN (?)", enables)
if err != nil {
return nil, 0, err
}
if len(cronJobs) == 0 {
return nil, 0, nil
}
cronJobMapSlice := make(map[enums.CronJob_Enable_Enum][]*models.CronJob)
for i := range cronJobs {
cronJobMapSlice[cronJobs[i].Enable] = append(cronJobMapSlice[cronJobs[i].Enable], cronJobs[i])
}
return cronJobMapSlice, len(cronJobs), nil
}
func (x *cronJobDao) GetNotRunning(ctx context.Context, id xsf.ID) (*models.CronJob, error) {
result := x.DB().Session(&gorm.Session{Logger: glogger.Discard}).WithContext(ctx).Model(&models.CronJob{}).
Where("state = ?", enums.CronJob_State_NOT_RUNNING).Where("id = ?", id).
Update("state", enums.CronJob_State_RUNNING)
if result.Error != nil {
return nil, xerror.New(result.Error.Error())
}
if result.RowsAffected == 0 {
return nil, nil
}
return x.GetByID(ctx, id)
}
package dao
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/consts"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/odao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"golang.org/x/sync/singleflight"
)
type cronJobLogDao struct {
*odao.ODao[models.CronJobLog]
}
var CronJobLogDao = &cronJobLogDao{
ODao: &odao.ODao[models.CronJobLog]{
XDB: &database.XDB{
Schema: consts.DefaultSchema,
},
Sf: new(singleflight.Group),
},
}
func (x *cronJobLogDao) GetSliceByJobID(ctx context.Context, order string, jobID xsf.ID) ([]*models.CronJobLog, error) {
if jobID <= 0 {
return nil, nil
}
return x.GetOrderSliceByWhere(ctx, order, "job_id = ?", jobID)
}
func (x *cronJobLogDao) GetSliceByJobIDs(ctx context.Context, order string, jobIDs []xsf.ID) ([]*models.CronJobLog, error) {
if len(jobIDs) == 0 {
return nil, nil
}
cronJobLogs, err := x.GetOrderSliceByWhere(ctx, order, "job_id IN (?)", jobIDs)
if err != nil {
return nil, err
}
return cronJobLogs, nil
}
func (x *cronJobLogDao) GetMapSliceByJobIDs(ctx context.Context, order string, jobIDs []xsf.ID) (map[xsf.ID][]*models.CronJobLog, int, error) {
if len(jobIDs) == 0 {
return nil, 0, nil
}
cronJobLogs, err := x.GetOrderSliceByWhere(ctx, order, "job_id IN (?)", jobIDs)
if err != nil {
return nil, 0, err
}
itemMap := make(map[xsf.ID][]*models.CronJobLog)
for _, record := range cronJobLogs {
itemMap[record.JobID] = append(itemMap[record.JobID], record)
}
return itemMap, len(cronJobLogs), nil
}
package dao
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/consts"
"gitlab.wanzhuangkj.com/tush/xpkg/database"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon/odao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"golang.org/x/sync/singleflight"
)
type cronJobRecordDao struct {
*odao.ODao[models.CronJobRecord]
}
var CronJobRecordDao = &cronJobRecordDao{
ODao: &odao.ODao[models.CronJobRecord]{
XDB: &database.XDB{
Schema: consts.DefaultSchema,
},
Sf: new(singleflight.Group),
},
}
func (x *cronJobRecordDao) GetSliceByJobID(ctx context.Context, order string, jobID xsf.ID) ([]*models.CronJobRecord, error) {
if jobID <= 0 {
return nil, nil
}
return x.GetOrderSliceByWhere(ctx, order, "job_id = ?", jobID)
}
func (x *cronJobRecordDao) GetSliceByJobIDs(ctx context.Context, order string, jobIDs []xsf.ID) ([]*models.CronJobRecord, error) {
if len(jobIDs) == 0 {
return nil, nil
}
cronJobRecords, err := x.GetOrderSliceByWhere(ctx, order, "job_id IN (?)", jobIDs)
if err != nil {
return nil, err
}
return cronJobRecords, nil
}
func (x *cronJobRecordDao) GetMapSliceByJobIDs(ctx context.Context, order string, jobIDs []xsf.ID) (map[xsf.ID][]*models.CronJobRecord, int, error) {
if len(jobIDs) == 0 {
return nil, 0, nil
}
cronJobRecords, err := x.GetOrderSliceByWhere(ctx, order, "job_id IN (?)", jobIDs)
if err != nil {
return nil, 0, err
}
itemMap := make(map[xsf.ID][]*models.CronJobRecord)
for _, record := range cronJobRecords {
itemMap[record.JobID] = append(itemMap[record.JobID], record)
}
return itemMap, len(cronJobRecords), nil
}
package dao
var (
Schema string
)
func Init(schema string) {
Schema = schema
}
package ecode
import "gitlab.wanzhuangkj.com/tush/xpkg/errcode"
var (
ErrCronJobNotFound = errcode.NewError(12001, "未查询到任务")
)
package enums
type CronJob_State_Enum int8
var (
CronJob_State_RUNNING CronJob_State_Enum = 1 // 运行中
CronJob_State_NOT_RUNNING CronJob_State_Enum = -1 // 未运行
)
func (x CronJob_State_Enum) Desc() string {
desc := "未知"
switch x {
case CronJob_State_RUNNING:
desc = "运行中"
case CronJob_State_NOT_RUNNING:
desc = "未运行"
}
return desc
}
type CronJob_Enable_Enum int8
var (
CronJob_Enable_CLOSE CronJob_Enable_Enum = -1 // 关闭
CronJob_Enable_OPEN CronJob_Enable_Enum = 1 // 开启
)
func (x CronJob_Enable_Enum) Desc() string {
desc := "未知"
switch x {
case CronJob_Enable_CLOSE:
desc = "关闭"
case CronJob_Enable_OPEN:
desc = "开启"
}
return desc
}
package models
import (
"time"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/enums"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gorm.io/gorm"
)
// CronJob 定时任务
type CronJob struct {
ID xsf.ID `json:"id" form:"id" swaggertype:"string" gorm:"column:id;type:bigint(20) unsigned;primaryKey;autoIncrement;comment:主键" example:"101"` //主键
JobName string `json:"jobName" form:"jobName" gorm:"column:job_name;type:varchar(128);comment:定时任务名称" example:"任务-自动确认订单"` //定时任务名称
JobCode string `json:"jobCode" form:"jobCode" gorm:"column:job_code;type:varchar(128);comment:定时任务code" example:"autoConfirmOrdersTask"` //定时任务code
TimeSpec string `json:"timeSpec" form:"timeSpec" gorm:"column:time_spec;type:varchar(128);comment:定时任务时间表达式" example:"0 0 0 * * ?"` //定时任务时间表达式
State enums.CronJob_State_Enum `json:"state" form:"state" gorm:"column:state;type:tinyint(4);default:1;comment:任务状态 枚举[-1:未运行 1:运行中 ]" example:"1"` //任务状态 枚举[-1:未运行 1:运行中 ]
Enable enums.CronJob_Enable_Enum `json:"enable" form:"enable" gorm:"column:enable;type:tinyint(1);default:1;comment:任务开关 枚举[-1:关闭 1:开启]" example:"1"` //任务开关 枚举[-1:关闭 1:开启]
Info string `json:"info" form:"info" gorm:"column:info;type:text;comment:额外信息" example:""` //额外信息
CreatedAt xtime.DateTime `json:"createdAt" form:"createdAt" gorm:"column:created_at;type:datetime;comment:创建时间" example:"2025-01-07 12:20:43"` //创建时间
UpdatedAt xtime.DateTime `json:"updatedAt" form:"updatedAt" gorm:"column:updated_at;type:datetime;comment:更新时间" example:"2025-01-07 12:20:43"` //更新时间
}
type CronJobInfo struct {
Before time.Duration `json:"before"`
}
var CronJobTool = CronJob{}
const TBCronJob = "cron_job"
func (CronJob) TableName() string {
return TBCronJob
}
func (x CronJob) GetID() xsf.ID {
return x.ID
}
func (x CronJob) GetOrder() string {
return "id asc"
}
func (x *CronJob) BeforeCreate(tx *gorm.DB) (err error) {
if x.CreatedAt.IsZero() {
x.CreatedAt = xtime.Now()
}
return nil
}
func (x *CronJob) BeforeUpdate(tx *gorm.DB) (err error) {
if x.UpdatedAt.IsZero() {
x.UpdatedAt = xtime.Now()
}
return nil
}
package models
import (
"gitlab.wanzhuangkj.com/tush/xpkg/utils/setUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gorm.io/gorm"
)
// CronJobLog 定时任务日志
type CronJobLog struct {
ID xsf.ID `json:"id" form:"id" swaggertype:"string" gorm:"column:id;type:bigint(20) unsigned;primaryKey;autoIncrement;comment:主键" example:"101"` //主键
JobID xsf.ID `json:"jobID" form:"jobID" swaggertype:"string" gorm:"column:job_id;type:bigint(20) unsigned;comment:job ID" example:"101"` //job ID
Info string `json:"info" form:"info" gorm:"column:info;type:text;comment:日志" example:"job occur error..."` //日志
CreatedAt xtime.DateTime `json:"createdAt" form:"createdAt" gorm:"column:created_at;type:datetime;comment:创建时间" example:"2025-01-07 12:20:43"` //创建时间
}
var CronJobLogTool = CronJobLog{}
const TBCronJobLog = "cron_job_log"
func (CronJobLog) TableName() string {
return TBCronJobLog
}
func (x CronJobLog) GetID() xsf.ID {
return x.ID
}
func (x *CronJobLog) Init() *CronJobLog {
return x
}
func (x *CronJobLog) GetJobID() xsf.ID {
return x.JobID
}
func (x *CronJobLog) GetJobIDs(rs []*CronJobLog) []xsf.ID {
set := setUtils.NewSet[xsf.ID]()
for _, r := range rs {
set.Add(r.GetJobID())
}
return set.Slice()
}
func (x CronJobLog) GetOrder() string {
return "id asc"
}
func (x *CronJobLog) BeforeCreate(tx *gorm.DB) (err error) {
if x.CreatedAt.IsZero() {
x.CreatedAt = xtime.Now()
}
return nil
}
func (x *CronJobLog) BeforeUpdate(tx *gorm.DB) (err error) {
return nil
}
package models
import (
"gitlab.wanzhuangkj.com/tush/xpkg/utils/setUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xtime"
"gorm.io/gorm"
)
// CronJobRecord 定时任务运行记录
type CronJobRecord struct {
ID xsf.ID `json:"id" form:"id" swaggertype:"string" gorm:"column:id;type:bigint(20) unsigned;primaryKey;autoIncrement;comment:主键" example:"101"` //主键
JobID xsf.ID `json:"jobID" form:"jobID" swaggertype:"string" gorm:"column:job_id;type:bigint(20) unsigned;comment:job ID" example:"101"` //job ID
BeginTime xtime.DateTime `json:"beginTime" form:"beginTime" gorm:"column:begin_time;type:datetime;default:CURRENT_TIMESTAMP;comment:开始时间" example:"2025-01-07 12:20:43"` //开始时间
EndTime xtime.DateTime `json:"endTime" form:"endTime" gorm:"column:end_time;type:datetime;default:CURRENT_TIMESTAMP;comment:结束时间" example:"2025-01-07 12:20:43"` //结束时间
Info string `json:"info" form:"info" gorm:"column:info;type:text;comment:额外信息" example:""` //额外信息
CreatedAt xtime.DateTime `json:"createdAt" form:"createdAt" gorm:"column:created_at;type:datetime;comment:创建时间" example:"2025-01-07 12:20:43"` //创建时间
UpdatedAt xtime.DateTime `json:"updatedAt" form:"updatedAt" gorm:"column:updated_at;type:datetime;comment:更新时间" example:"2025-01-07 12:20:43"` //更新时间
}
var CronJobRecordTool = CronJobRecord{}
const TBCronJobRecord = "cron_job_record"
func (CronJobRecord) TableName() string {
return TBCronJobRecord
}
func (x CronJobRecord) GetID() xsf.ID {
return x.ID
}
func (x *CronJobRecord) Init() *CronJobRecord {
return x
}
func (x *CronJobRecord) GetJobID() xsf.ID {
return x.JobID
}
func (x *CronJobRecord) GetJobIDs(rs []*CronJobRecord) []xsf.ID {
set := setUtils.NewSet[xsf.ID]()
for _, r := range rs {
set.Add(r.GetJobID())
}
return set.Slice()
}
func (x CronJobRecord) GetOrder() string {
return "id asc"
}
func (x *CronJobRecord) BeforeCreate(tx *gorm.DB) (err error) {
if x.CreatedAt.IsZero() {
x.CreatedAt = xtime.Now()
}
return nil
}
func (x *CronJobRecord) BeforeUpdate(tx *gorm.DB) (err error) {
if x.UpdatedAt.IsZero() {
x.UpdatedAt = xtime.Now()
}
return nil
}
package service
import (
"context"
"fmt"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/biz"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/dao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/ecode"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/enums"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/types"
"github.com/jinzhu/copier"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/sliceUtils"
"gitlab.wanzhuangkj.com/tush/xpkg/xerrors/xerror"
)
type cronJobService struct {
*xcommon.Service
}
var CronJobService = newCronJobService()
func newCronJobService() *cronJobService {
return &cronJobService{
Service: xcommon.NewService(),
}
}
func (x *cronJobService) GetByJobCode(ctx context.Context, jobCode string, opts *biz.CronJobOpts) (*biz.CronJob, error) {
if jobCode == "" {
return nil, nil
}
cronJob, err := dao.CronJobDao.GetByJobCode(ctx, jobCode)
if err != nil {
return nil, err
}
return x.ToBiz(ctx, cronJob, opts)
}
func (x *cronJobService) GetNotRunning(ctx context.Context, id xsf.ID) (*models.CronJob, error) {
return dao.CronJobDao.GetNotRunning(ctx, id)
}
func (x *cronJobService) Create(ctx context.Context, req *types.CronJobCreateReq, opts *biz.CronJobOpts) (id xsf.ID, err error) {
cronJob := &models.CronJob{}
_ = copier.Copy(cronJob, req)
if err = dao.CronJobDao.Create(ctx, cronJob); err != nil {
return 0, err
}
return cronJob.ID, nil
}
func (x *cronJobService) DeleteByID(ctx context.Context, id xsf.ID, opts *biz.CronJobOpts) error {
if id <= 0 {
return nil
}
cronJob, err := x.GetByID(ctx, id, opts)
if err != nil {
return err
}
if cronJob == nil {
return xerror.NewC(ecode.ErrCronJobNotFound.Code(), fmt.Sprintf("未查询到%s[id:%d]", models.CronJobTool.TableName(), id))
}
return dao.CronJobDao.DeleteByID(ctx, id)
}
func (x *cronJobService) DeleteByIDs(ctx context.Context, ids []xsf.ID, opts *biz.CronJobOpts) error {
if len(ids) == 0 {
return nil
}
cronJobs, err := x.GetSliceByIDs(ctx, ids, opts)
if err != nil {
return err
}
err = sliceUtils.CompareSlice(ids, cronJobs, ecode.ErrCronJobNotFound)
if err != nil {
return err
}
return dao.CronJobDao.DeleteByIDs(ctx, ids)
}
func (x *cronJobService) StartByID(ctx context.Context, id xsf.ID, opts *biz.CronJobOpts) error {
req := &types.CronJobUpdateByIDReq{
ID: id,
Enable: enums.CronJob_Enable_OPEN,
}
return x.UpdateByID(ctx, req, opts)
}
func (x *cronJobService) StopByID(ctx context.Context, id xsf.ID, opts *biz.CronJobOpts) error {
req := &types.CronJobUpdateByIDReq{
ID: id,
Enable: enums.CronJob_Enable_CLOSE,
}
return x.UpdateByID(ctx, req, opts)
}
func (x *cronJobService) UpdateByID(ctx context.Context, req *types.CronJobUpdateByIDReq, opts *biz.CronJobOpts) error {
if req.ID <= 0 {
return nil
}
cronJob, err := x.GetByID(ctx, req.ID, opts)
if err != nil {
return err
}
if cronJob == nil {
return xerror.NewC(ecode.ErrCronJobNotFound.Code(), fmt.Sprintf("未查询到%s[id:%d]", models.CronJobTool.TableName(), req.ID))
}
cronJobUpd := &models.CronJob{}
_ = copier.Copy(cronJobUpd, req)
return dao.CronJobDao.UpdateByID(ctx, cronJobUpd.ID, cronJobUpd)
}
func (x *cronJobService) UpdateByIDs(ctx context.Context, req *types.CronJobUpdateByIDsReq, opts *biz.CronJobOpts) error {
if len(req.IDs) == 0 {
return nil
}
cronJobs, err := x.GetSliceByIDs(ctx, req.IDs, opts)
if err != nil {
return err
}
err = sliceUtils.CompareSlice(req.IDs, cronJobs, ecode.ErrCronJobNotFound)
if err != nil {
return err
}
cronJobUpd := &models.CronJob{}
_ = copier.Copy(cronJobUpd, req)
return dao.CronJobDao.UpdateByIDs(ctx, req.IDs, cronJobUpd)
}
func (x *cronJobService) GetByID(ctx context.Context, id xsf.ID, opts *biz.CronJobOpts) (cronJobBiz *biz.CronJob, err error) {
if id <= 0 {
return nil, nil
}
cronJob, err := dao.CronJobDao.GetByID(ctx, id)
if err != nil {
return nil, err
}
if cronJob == nil {
return nil, nil
}
return x.ToBiz(ctx, cronJob, opts)
}
func (x *cronJobService) GetSliceByIDs(ctx context.Context, ids []xsf.ID, opts *biz.CronJobOpts) ([]*biz.CronJob, error) {
if len(ids) == 0 {
return nil, nil
}
cronJobs, err := dao.CronJobDao.GetSliceByIDs(ctx, ids)
if err != nil {
return nil, err
}
if len(cronJobs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobs, opts)
}
func (x *cronJobService) GetMapByIDs(ctx context.Context, ids []xsf.ID, opts *biz.CronJobOpts) (map[xsf.ID]*biz.CronJob, error) {
if len(ids) == 0 {
return nil, nil
}
cronJobBizSlice, err := x.GetSliceByIDs(ctx, ids, opts)
if err != nil {
return nil, err
}
if len(cronJobBizSlice) == 0 {
return nil, nil
}
cronJobBizIDMap := biz.CronJobTool.SliceToIDMapBiz(cronJobBizSlice)
return cronJobBizIDMap, nil
}
func (x *cronJobService) Page(ctx context.Context, req *types.CronJobPageReq, opts *biz.CronJobOpts) ([]*biz.CronJob, int64, error) {
cronJobs, total, err := dao.CronJobDao.Page(ctx, req)
if err != nil {
return nil, 0, err
}
if len(cronJobs) == 0 {
return nil, total, nil
}
cronJobBizSlice, err := x.ToSliceBiz(ctx, cronJobs, opts)
if err != nil {
return nil, 0, err
}
return cronJobBizSlice, total, nil
}
func (x *cronJobService) GetSliceByState(ctx context.Context, state enums.CronJob_State_Enum, opts *biz.CronJobOpts) ([]*biz.CronJob, error) {
cronJobs, err := dao.CronJobDao.GetSliceByState(ctx, models.CronJobTool.GetOrder(), state)
if err != nil {
return nil, err
}
if len(cronJobs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobs, opts)
}
func (x *cronJobService) GetSliceByStates(ctx context.Context, states []enums.CronJob_State_Enum, opts *biz.CronJobOpts) ([]*biz.CronJob, error) {
if len(states) == 0 {
return nil, nil
}
cronJobMapSlice, size, err := dao.CronJobDao.GetMapSliceByStates(ctx, models.CronJobTool.GetOrder(), states)
if err != nil {
return nil, err
}
if len(cronJobMapSlice) == 0 {
return nil, nil
}
cronJobs := make([]*models.CronJob, 0, size)
for _, v := range cronJobMapSlice {
cronJobs = append(cronJobs, v...)
}
return x.ToSliceBiz(ctx, cronJobs, opts)
}
func (x *cronJobService) GetMapSliceByStates(ctx context.Context, states []enums.CronJob_State_Enum, opts *biz.CronJobOpts) (map[enums.CronJob_State_Enum][]*biz.CronJob, int, error) {
if len(states) == 0 {
return nil, 0, nil
}
cronJobBizSlice, err := x.GetSliceByStates(ctx, states, opts)
if err != nil {
return nil, 0, err
}
if len(cronJobBizSlice) == 0 {
return nil, 0, nil
}
ret := make(map[enums.CronJob_State_Enum][]*biz.CronJob)
for _, r := range cronJobBizSlice {
ret[r.State] = append(ret[r.State], r)
}
return ret, len(cronJobBizSlice), nil
}
func (x *cronJobService) GetSliceByEnable(ctx context.Context, enable enums.CronJob_Enable_Enum, opts *biz.CronJobOpts) ([]*biz.CronJob, error) {
cronJobs, err := dao.CronJobDao.GetSliceByEnable(ctx, models.CronJobTool.GetOrder(), enable)
if err != nil {
return nil, err
}
if len(cronJobs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobs, opts)
}
func (x *cronJobService) ToBiz(ctx context.Context, cronJob *models.CronJob, opts *biz.CronJobOpts) (cronJobBiz *biz.CronJob, err error) {
if cronJob == nil {
return nil, nil
}
cronJobBiz = biz.CronJobTool.NewFromModel(cronJob)
if err = x.Fill(ctx, cronJobBiz, opts); err != nil {
return nil, err
}
if err = biz.CronJobTool.Valid(ctx, cronJobBiz, opts); err != nil {
return nil, err
}
return cronJobBiz, nil
}
func (x *cronJobService) ToSliceBiz(ctx context.Context, cronJobs []*models.CronJob, opts *biz.CronJobOpts) (cronJobBizSlice []*biz.CronJob, err error) {
if len(cronJobs) == 0 {
return nil, nil
}
cronJobBizSlice = biz.CronJobTool.NewSliceFromModelsBiz(cronJobs)
if err = x.SliceFill(ctx, cronJobBizSlice, opts); err != nil {
return nil, err
}
if err = biz.CronJobTool.ValidSlice(ctx, cronJobBizSlice, opts); err != nil {
return nil, err
}
return cronJobBizSlice, nil
}
func (x *cronJobService) Fill(ctx context.Context, cronJobBiz *biz.CronJob, opts *biz.CronJobOpts) error {
if cronJobBiz == nil {
return nil
}
if opts != nil {
}
return nil
}
func (x *cronJobService) SliceFill(ctx context.Context, cronJobBizSlice []*biz.CronJob, opts *biz.CronJobOpts) error {
if len(cronJobBizSlice) == 0 {
return nil
}
if opts != nil {
}
return nil
}
func (x *cronJobService) GetAll(ctx context.Context, opts *biz.CronJobOpts) ([]*biz.CronJob, error) {
cronJobs, err := dao.CronJobDao.GetAll(ctx)
if err != nil {
return nil, err
}
if len(cronJobs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobs, opts)
}
package service
import (
"context"
"gitlab.wanzhuangkj.com/tush/xpkg/utils/xsf"
"gitlab.wanzhuangkj.com/tush/xpkg/xcommon"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/biz"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/dao"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/models"
"gitlab.wanzhuangkj.com/tush/xpkg/xcron/types"
)
type cronJobLogService struct {
*xcommon.Service
}
var CronJobLogService = newCronJobLogService()
func newCronJobLogService() *cronJobLogService {
return &cronJobLogService{
Service: xcommon.NewService(),
}
}
func (x *cronJobLogService) GetByID(ctx context.Context, id xsf.ID, opts *biz.CronJobLogOpts) (cronJobLogBiz *biz.CronJobLog, err error) {
if id <= 0 {
return nil, nil
}
cronJobLog, err := dao.CronJobLogDao.GetByID(ctx, id)
if err != nil {
return nil, err
}
if cronJobLog == nil {
return nil, nil
}
return x.ToBiz(ctx, cronJobLog, opts)
}
func (x *cronJobLogService) GetSliceByIDs(ctx context.Context, ids []xsf.ID, opts *biz.CronJobLogOpts) ([]*biz.CronJobLog, error) {
if len(ids) == 0 {
return nil, nil
}
cronJobLogs, err := dao.CronJobLogDao.GetSliceByIDs(ctx, ids)
if err != nil {
return nil, err
}
if len(cronJobLogs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobLogs, opts)
}
func (x *cronJobLogService) GetMapByIDs(ctx context.Context, ids []xsf.ID, opts *biz.CronJobLogOpts) (map[xsf.ID]*biz.CronJobLog, error) {
if len(ids) == 0 {
return nil, nil
}
cronJobLogBizSlice, err := x.GetSliceByIDs(ctx, ids, opts)
if err != nil {
return nil, err
}
if len(cronJobLogBizSlice) == 0 {
return nil, nil
}
cronJobLogBizIDMap := biz.CronJobLogTool.SliceToIDMapBiz(cronJobLogBizSlice)
return cronJobLogBizIDMap, nil
}
func (x *cronJobLogService) Page(ctx context.Context, req *types.CronJobLogPageReq, opts *biz.CronJobLogOpts) ([]*biz.CronJobLog, int64, error) {
cronJobLogs, total, err := dao.CronJobLogDao.Page(ctx, req)
if err != nil {
return nil, 0, err
}
if len(cronJobLogs) == 0 {
return nil, total, nil
}
cronJobLogBizSlice, err := x.ToSliceBiz(ctx, cronJobLogs, opts)
if err != nil {
return nil, 0, err
}
return cronJobLogBizSlice, total, nil
}
func (x *cronJobLogService) GetSliceByJobID(ctx context.Context, jobID xsf.ID, opts *biz.CronJobLogOpts) ([]*biz.CronJobLog, error) {
if jobID == 0 {
return nil, nil
}
cronJobLogs, err := dao.CronJobLogDao.GetSliceByJobID(ctx, models.CronJobLogTool.GetOrder(), jobID)
if err != nil {
return nil, err
}
if len(cronJobLogs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobLogs, opts)
}
func (x *cronJobLogService) GetSliceByJobIDs(ctx context.Context, jobIDs []xsf.ID, opts *biz.CronJobLogOpts) ([]*biz.CronJobLog, error) {
if len(jobIDs) == 0 {
return nil, nil
}
cronJobLogs, err := dao.CronJobLogDao.GetSliceByJobIDs(ctx, models.CronJobLogTool.GetOrder(), jobIDs)
if err != nil {
return nil, err
}
if len(cronJobLogs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobLogs, opts)
}
func (x *cronJobLogService) GetMapSliceByJobIDs(ctx context.Context, jobIDs []xsf.ID, opts *biz.CronJobLogOpts) (map[xsf.ID][]*biz.CronJobLog, int, error) {
if len(jobIDs) == 0 {
return nil, 0, nil
}
cronJobLogBizSlice, err := x.GetSliceByJobIDs(ctx, jobIDs, opts)
if err != nil {
return nil, 0, err
}
if len(cronJobLogBizSlice) == 0 {
return nil, 0, nil
}
ret := make(map[xsf.ID][]*biz.CronJobLog)
for _, r := range cronJobLogBizSlice {
ret[r.JobID] = append(ret[r.JobID], r)
}
return ret, len(cronJobLogBizSlice), nil
}
func (x *cronJobLogService) ToBiz(ctx context.Context, cronJobLog *models.CronJobLog, opts *biz.CronJobLogOpts) (cronJobLogBiz *biz.CronJobLog, err error) {
if cronJobLog == nil {
return nil, nil
}
cronJobLogBiz = biz.CronJobLogTool.NewFromModel(cronJobLog)
if err = x.Fill(ctx, cronJobLogBiz, opts); err != nil {
return nil, err
}
if err = biz.CronJobLogTool.Valid(ctx, cronJobLogBiz, opts); err != nil {
return nil, err
}
return cronJobLogBiz, nil
}
func (x *cronJobLogService) ToSliceBiz(ctx context.Context, cronJobLogs []*models.CronJobLog, opts *biz.CronJobLogOpts) (cronJobLogBizSlice []*biz.CronJobLog, err error) {
if len(cronJobLogs) == 0 {
return nil, nil
}
cronJobLogBizSlice = biz.CronJobLogTool.NewSliceFromModelsBiz(cronJobLogs)
if err = x.SliceFill(ctx, cronJobLogBizSlice, opts); err != nil {
return nil, err
}
if err = biz.CronJobLogTool.ValidSlice(ctx, cronJobLogBizSlice, opts); err != nil {
return nil, err
}
return cronJobLogBizSlice, nil
}
func (x *cronJobLogService) Fill(ctx context.Context, cronJobLogBiz *biz.CronJobLog, opts *biz.CronJobLogOpts) error {
if cronJobLogBiz == nil {
return nil
}
if opts != nil {
}
return nil
}
func (x *cronJobLogService) SliceFill(ctx context.Context, cronJobLogBizSlice []*biz.CronJobLog, opts *biz.CronJobLogOpts) error {
if len(cronJobLogBizSlice) == 0 {
return nil
}
if opts != nil {
}
return nil
}
func (x *cronJobLogService) GetAll(ctx context.Context, opts *biz.CronJobLogOpts) ([]*biz.CronJobLog, error) {
cronJobLogs, err := dao.CronJobLogDao.GetAll(ctx)
if err != nil {
return nil, err
}
if len(cronJobLogs) == 0 {
return nil, nil
}
return x.ToSliceBiz(ctx, cronJobLogs, opts)
}
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论