Commit d8fcc2e8 authored by qiaoweitong's avatar qiaoweitong
Browse files

完善

No related merge requests found
Pipeline #13352 failed with stages
Showing with 534 additions and 289 deletions
+534 -289
......@@ -41,7 +41,7 @@ func main() {
// defer resp.Body.Close()
// openlog.Info("REST Server sayhello[GET]: " + string(httputil.ReadBody(resp)))
// register()
sendstream()
sendmsg()
}
func register() {
......@@ -78,7 +78,7 @@ func sendmsg() {
time.Sleep(time.Second * 2)
messageid := GenUUID()
if err := invoker.Invoke(ctx, "WebsocketServer", "com.zhigui.xian.websocket.WS", "Send2Client",
&proto.Send2ClientReq{MessageId: messageid, ServerId: "abc", ClientId: "0oastmG8QqvWCuqJT9AyXxoxfs6qJo0DpTXsgRkhZpMfRYrTylvDeOjFOlxScc/e", Data: "data-xxxxxx" + strconv.Itoa(i)}, reply, core.WithProtocol("grpc")); err != nil {
&proto.Send2ClientReq{MessageId: messageid, ServerId: "xxx", ClientId: "CTZjrFcur/zpZej3H5y/LurGID5mG1Nv/Ph/HU8PZjzkLSlfbZ37N3ljvn6sIJv3Y5oHq6+hYCiytrlsFrrAoQ==", Data: "data-xxxxxx" + strconv.Itoa(i)}, reply, core.WithProtocol("grpc")); err != nil {
openlog.Error("error" + err.Error())
}
}
......@@ -116,7 +116,7 @@ func sendstream() {
messageid := GenUUID()
go func() {
err := stream.Send(&proto.Send2ClientReq{MessageId: messageid, ServerId: "abc", ClientId: "0kxC2Og4VlOH0uM0Y9mRt24mzt3EaTzlpescyaxGy7Jp7CxSLnAn0UKrToH1L3qF", Data: "data-aaa"})
err := stream.Send(&proto.Send2ClientReq{MessageId: messageid, ServerId: "abc", ClientId: "FaVQm16EBONPF4U4sg8CarR1NGdaUg8tu0AxgC+yFqbthc8Df+wCzr0WRpbPKLn6mX9pDqDZYei9sCM4ye6wbQ==", Data: "data-aaa"})
if err != nil {
panic(err)
}
......@@ -129,7 +129,7 @@ func sendstream() {
}()
go func() {
err := stream.Send(&proto.Send2ClientReq{MessageId: messageid, ServerId: "abc", ClientId: "8W0Sqzi/g8y5Vwl5odfztaqM9nicV5vqhz2uvWax4Fwqq1rlBQUo4Xp+wqEel4hL", Data: "data-bbb"})
err := stream.Send(&proto.Send2ClientReq{MessageId: messageid, ServerId: "abc", ClientId: "R2ltcIYr3yy7cSfYL7yKTrrbNpzR3pV5QzINB/Ej9WVrkEK8QQPY0jKSAMbaty2hKihx8tVyuWpo2DOK4rmYyQ==", Data: "data-bbb"})
if err != nil {
panic(err)
}
......
[common]
HttpPort = 6000
RPCPort = 7000
# 是否集群,单机则设为false
Cluster = false
# 对称加密key
CryptoKey =
[etcd]
Endpoints = 127.0.0.1:2379, 127.0.0.1:2379
\ No newline at end of file
......@@ -3,4 +3,9 @@ servicecomb:
address: http://127.0.0.1:30100
protocols:
grpc:
listenAddress: 127.0.0.1:5001
\ No newline at end of file
listenAddress: 0.0.0.0:5001
websocket:
server:
register: etcd
listenAddress: 0.0.0.0:5002
cryptoKey: Adba723b7fe06819
package config
import (
"errors"
"net"
"strings"
"sync"
"github.com/go-chassis/go-archaius"
)
type global struct {
LocalHost string //本机内网IP
ServerList map[string]string
ServerListLock sync.RWMutex
RPCPort string // rpc端口
RPCAddr string // rpc地址
RPCServiceName string // rpc service name
RegisterAddr string // 微服务注册地址
CryptoKey string // 对称加密秘钥
EtcdAddr string // etcd 地址
WebsocketAddr string // websocket 地址
}
var GlobalEnv = &global{}
func LoadEnv() error {
rpcAddr := archaius.GetString("servicecomb.protocols.grpc.listenAddress", "0.0.0.0:5001")
GlobalEnv.LocalHost = getIntranetIp()
GlobalEnv.RPCAddr = rpcAddr
GlobalEnv.RPCServiceName = archaius.GetString("servicecomb.service.name", "websocket")
GlobalEnv.RegisterAddr = archaius.GetString("servicecomb.registry.address", "http://0.0.0.0:30100")
etcdAddr := archaius.GetString("MICRO_REGISTRY_ADDRESS", "0.0.0.0:2379")
GlobalEnv.EtcdAddr = etcdAddr
GlobalEnv.WebsocketAddr = archaius.GetString("servicecomb.websocket.server.listenAddress", "0.0.0.0:5002")
GlobalEnv.CryptoKey = archaius.GetString("servicecomb.websocket.server.cryptoKey", "Adba723b7fe06819")
rpcAddrArr := strings.Split(rpcAddr, ":")
if len(rpcAddrArr) != 2 {
return errors.New("rpc地址配置错误")
}
GlobalEnv.RPCPort = rpcAddrArr[1]
return nil
}
//获取本机内网IP
func getIntranetIp() string {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
#!/bin/bash
export WEBSOCKET_PORT=5002
export WS_SERVER_REGISTER=etcd
export CHASSIS_HOME=$PWD
go run main.go
\ No newline at end of file
#!/bin/bash
export WEBSOCKET_PORT=5005
export WS_SERVER_REGISTER=etcd
export CHASSIS_HOME=$PWD
go run main.go
\ No newline at end of file
......@@ -20,7 +20,6 @@ require (
github.com/go-chassis/go-chassis/v2 v2.2.0
github.com/go-chassis/openlog v1.1.2
github.com/go-chassis/sc-client v0.6.0 // indirect
github.com/go-ini/ini v1.62.0
github.com/golang/protobuf v1.5.2
github.com/gorilla/websocket v1.4.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
......@@ -43,7 +42,6 @@ require (
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 // indirect
google.golang.org/genproto v0.0.0-20210614182748-5b3b54cad159 // indirect
google.golang.org/grpc v1.38.0
gopkg.in/ini.v1 v1.62.0 // indirect
k8s.io/client-go v0.21.1 // indirect
k8s.io/utils v0.0.0-20210527160623-6fdb442a123b // indirect
)
......
......@@ -150,8 +150,6 @@ github.com/go-chassis/seclog v1.3.0/go.mod h1:a/zGvX5BRiwtq/O0fRqS6VWjrBaXYtqFJB
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ini/ini v1.62.0 h1:7VJT/ZXjzqSrvtraFp4ONq80hTcRQth1c9ZnQ3uNQvU=
github.com/go-ini/ini v1.62.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
......@@ -716,8 +714,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
......
package main
import (
"gateway-codec/pkg/setting"
"fmt"
"gateway-codec/config"
"gateway-codec/wshandler"
"gateway-codec/wsserver"
"net"
......@@ -11,24 +12,27 @@ import (
"gateway-codec/pkg/etcd"
archaius "github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-archaius"
_ "github.com/go-chassis/go-chassis-extension/protocol/grpc/server"
"github.com/go-chassis/go-chassis/v2"
"github.com/go-chassis/go-chassis/v2/core/server"
"github.com/go-chassis/openlog"
)
// 加载配置
func init() {
setting.Setup()
}
func main() {
chassis.RegisterSchema("grpc", &wshandler.WsHandler{}, server.WithRPCServiceDesc(&wsproto.WS_serviceDesc))
if err := chassis.Init(); err != nil {
openlog.Error("Init failed.")
return
}
// 加载配置到struct
err := config.LoadEnv()
if err != nil {
panic(err)
}
fmt.Println(archaius.GetConfigs())
fmt.Println(config.GlobalEnv)
// 启动websocket服务
go StartWebSocket()
chassis.Run()
......@@ -45,28 +49,28 @@ func StartWebSocket() {
wsserver.PingTimer()
// 监听端口
if err := http.ListenAndServe(":"+archaius.GetString("WEBSOCKET_PORT", "5002"), nil); err != nil {
if err := http.ListenAndServe(config.GlobalEnv.WebsocketAddr, nil); err != nil {
panic(err)
}
}
//ETCD注册发现服务
func registerServer() {
if wsserver.WsServerEnv() == "etcd" {
//
func registerEtcdServer() {
if wsserver.WsServerEnv() == wsserver.SERVER_ETCD {
//注册租约
ser, err := etcd.NewServiceReg([]string{"127.0.0.1:2379"}, 5)
ser, err := etcd.NewServiceReg([]string{config.GlobalEnv.EtcdAddr}, 5)
if err != nil {
panic(err)
}
hostPort := net.JoinHostPort("127.0.0.1", "5003")
hostPort := net.JoinHostPort(config.GlobalEnv.LocalHost, config.GlobalEnv.RPCPort)
//添加key
err = ser.PutService("/wsServers/"+hostPort, hostPort)
if err != nil {
panic(err)
}
cli, err := etcd.NewClientDis([]string{"127.0.0.1:2379"})
cli, err := etcd.NewClientDis([]string{config.GlobalEnv.EtcdAddr})
if err != nil {
panic(err)
}
......
......@@ -4,7 +4,7 @@ import (
"context"
"time"
"gateway-codec/pkg/setting"
"gateway-codec/config"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
......@@ -69,15 +69,15 @@ func (this *ClientDis) extractAddrs(resp *clientv3.GetResponse) []string {
}
func (this *ClientDis) SetServiceList(key, val string) {
setting.GlobalSetting.ServerListLock.Lock()
defer setting.GlobalSetting.ServerListLock.Unlock()
setting.GlobalSetting.ServerList[key] = val
config.GlobalEnv.ServerListLock.Lock()
defer config.GlobalEnv.ServerListLock.Unlock()
config.GlobalEnv.ServerList[key] = val
log.Info("发现服务:", key, " 地址:", val)
}
func (this *ClientDis) DelServiceList(key string) {
setting.GlobalSetting.ServerListLock.Lock()
defer setting.GlobalSetting.ServerListLock.Unlock()
delete(setting.GlobalSetting.ServerList, key)
config.GlobalEnv.ServerListLock.Lock()
defer config.GlobalEnv.ServerListLock.Unlock()
delete(config.GlobalEnv.ServerList, key)
log.Println("服务下线:", key)
}
......@@ -2,12 +2,12 @@ package etcd
import (
"context"
"gateway-codec/config"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/openlog"
)
......@@ -17,7 +17,7 @@ var mu sync.Mutex
func GetInstance() *clientv3.Client {
if etcdKvClient == nil {
if client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
Endpoints: []string{config.GlobalEnv.EtcdAddr},
DialTimeout: 5 * time.Second,
}); err != nil {
openlog.Error(err.Error())
......@@ -33,10 +33,6 @@ func GetInstance() *clientv3.Client {
return etcdKvClient
}
func EtcdEndpoints() string {
return archaius.GetString("MICRO_REGISTRY_ADDRESS", "127.0.0.1:2379")
}
func Put(key, value string) error {
_, err := GetInstance().Put(context.Background(), key, value)
return err
......
package setting
import (
"flag"
"log"
"net"
"sync"
"github.com/go-ini/ini"
)
type commonConf struct {
HttpPort string
RPCPort string
Cluster bool
CryptoKey string
}
var CommonSetting = &commonConf{}
type etcdConf struct {
Endpoints []string
}
var EtcdSetting = &etcdConf{}
type global struct {
LocalHost string //本机内网IP
ServerList map[string]string
ServerListLock sync.RWMutex
}
var GlobalSetting = &global{}
var cfg *ini.File
func Setup() {
configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini")
var err error
cfg, err = ini.Load(*configFile)
if err != nil {
log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err)
}
mapTo("common", CommonSetting)
mapTo("etcd", EtcdSetting)
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
}
func Default() {
CommonSetting = &commonConf{
HttpPort: "6000",
RPCPort: "7000",
Cluster: false,
CryptoKey: "Adba723b7fe06819",
}
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
}
// mapTo map section
func mapTo(section string, v interface{}) {
err := cfg.Section(section).MapTo(v)
if err != nil {
log.Fatalf("Cfg.MapTo %s err: %v", section, err)
}
}
//获取本机内网IP
func getIntranetIp() string {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
......@@ -39,7 +39,7 @@ func (m *Send2ClientReq) Reset() { *m = Send2ClientReq{} }
func (m *Send2ClientReq) String() string { return proto.CompactTextString(m) }
func (*Send2ClientReq) ProtoMessage() {}
func (*Send2ClientReq) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{0}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{0}
}
func (m *Send2ClientReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Send2ClientReq.Unmarshal(m, b)
......@@ -112,7 +112,7 @@ func (m *RegisterReq) Reset() { *m = RegisterReq{} }
func (m *RegisterReq) String() string { return proto.CompactTextString(m) }
func (*RegisterReq) ProtoMessage() {}
func (*RegisterReq) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{1}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{1}
}
func (m *RegisterReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RegisterReq.Unmarshal(m, b)
......@@ -154,7 +154,7 @@ func (m *Send2ServerReq) Reset() { *m = Send2ServerReq{} }
func (m *Send2ServerReq) String() string { return proto.CompactTextString(m) }
func (*Send2ServerReq) ProtoMessage() {}
func (*Send2ServerReq) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{2}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{2}
}
func (m *Send2ServerReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Send2ServerReq.Unmarshal(m, b)
......@@ -209,6 +209,52 @@ func (m *Send2ServerReq) GetData() string {
return ""
}
type CloseClientReq struct {
ServerId string `protobuf:"bytes,1,opt,name=serverId,proto3" json:"serverId,omitempty"`
ClientId string `protobuf:"bytes,2,opt,name=clientId,proto3" json:"clientId,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CloseClientReq) Reset() { *m = CloseClientReq{} }
func (m *CloseClientReq) String() string { return proto.CompactTextString(m) }
func (*CloseClientReq) ProtoMessage() {}
func (*CloseClientReq) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_5d2e047a08ef9e59, []int{3}
}
func (m *CloseClientReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CloseClientReq.Unmarshal(m, b)
}
func (m *CloseClientReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CloseClientReq.Marshal(b, m, deterministic)
}
func (dst *CloseClientReq) XXX_Merge(src proto.Message) {
xxx_messageInfo_CloseClientReq.Merge(dst, src)
}
func (m *CloseClientReq) XXX_Size() int {
return xxx_messageInfo_CloseClientReq.Size(m)
}
func (m *CloseClientReq) XXX_DiscardUnknown() {
xxx_messageInfo_CloseClientReq.DiscardUnknown(m)
}
var xxx_messageInfo_CloseClientReq proto.InternalMessageInfo
func (m *CloseClientReq) GetServerId() string {
if m != nil {
return m.ServerId
}
return ""
}
func (m *CloseClientReq) GetClientId() string {
if m != nil {
return m.ClientId
}
return ""
}
type Send2ClientReply struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
......@@ -219,7 +265,7 @@ func (m *Send2ClientReply) Reset() { *m = Send2ClientReply{} }
func (m *Send2ClientReply) String() string { return proto.CompactTextString(m) }
func (*Send2ClientReply) ProtoMessage() {}
func (*Send2ClientReply) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{3}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{4}
}
func (m *Send2ClientReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Send2ClientReply.Unmarshal(m, b)
......@@ -249,7 +295,7 @@ func (m *RegisterReply) Reset() { *m = RegisterReply{} }
func (m *RegisterReply) String() string { return proto.CompactTextString(m) }
func (*RegisterReply) ProtoMessage() {}
func (*RegisterReply) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{4}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{5}
}
func (m *RegisterReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RegisterReply.Unmarshal(m, b)
......@@ -279,7 +325,7 @@ func (m *Send2ServerReply) Reset() { *m = Send2ServerReply{} }
func (m *Send2ServerReply) String() string { return proto.CompactTextString(m) }
func (*Send2ServerReply) ProtoMessage() {}
func (*Send2ServerReply) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_4dcc51ba75c921dd, []int{5}
return fileDescriptor_ws_5d2e047a08ef9e59, []int{6}
}
func (m *Send2ServerReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Send2ServerReply.Unmarshal(m, b)
......@@ -299,13 +345,45 @@ func (m *Send2ServerReply) XXX_DiscardUnknown() {
var xxx_messageInfo_Send2ServerReply proto.InternalMessageInfo
type CloseClientReply struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CloseClientReply) Reset() { *m = CloseClientReply{} }
func (m *CloseClientReply) String() string { return proto.CompactTextString(m) }
func (*CloseClientReply) ProtoMessage() {}
func (*CloseClientReply) Descriptor() ([]byte, []int) {
return fileDescriptor_ws_5d2e047a08ef9e59, []int{7}
}
func (m *CloseClientReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CloseClientReply.Unmarshal(m, b)
}
func (m *CloseClientReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CloseClientReply.Marshal(b, m, deterministic)
}
func (dst *CloseClientReply) XXX_Merge(src proto.Message) {
xxx_messageInfo_CloseClientReply.Merge(dst, src)
}
func (m *CloseClientReply) XXX_Size() int {
return xxx_messageInfo_CloseClientReply.Size(m)
}
func (m *CloseClientReply) XXX_DiscardUnknown() {
xxx_messageInfo_CloseClientReply.DiscardUnknown(m)
}
var xxx_messageInfo_CloseClientReply proto.InternalMessageInfo
func init() {
proto.RegisterType((*Send2ClientReq)(nil), "com.zhigui.xian.websocket.Send2ClientReq")
proto.RegisterType((*RegisterReq)(nil), "com.zhigui.xian.websocket.RegisterReq")
proto.RegisterType((*Send2ServerReq)(nil), "com.zhigui.xian.websocket.Send2ServerReq")
proto.RegisterType((*CloseClientReq)(nil), "com.zhigui.xian.websocket.CloseClientReq")
proto.RegisterType((*Send2ClientReply)(nil), "com.zhigui.xian.websocket.Send2ClientReply")
proto.RegisterType((*RegisterReply)(nil), "com.zhigui.xian.websocket.RegisterReply")
proto.RegisterType((*Send2ServerReply)(nil), "com.zhigui.xian.websocket.Send2ServerReply")
proto.RegisterType((*CloseClientReply)(nil), "com.zhigui.xian.websocket.CloseClientReply")
}
// Reference imports to suppress errors if they are not otherwise used.
......@@ -324,6 +402,7 @@ type WSClient interface {
Register(ctx context.Context, in *RegisterReq, opts ...grpc.CallOption) (*RegisterReply, error)
Send2Server(ctx context.Context, in *Send2ServerReq, opts ...grpc.CallOption) (*Send2ServerReply, error)
Send2ClientStream(ctx context.Context, opts ...grpc.CallOption) (WS_Send2ClientStreamClient, error)
CloseClient(ctx context.Context, in *CloseClientReq, opts ...grpc.CallOption) (*CloseClientReply, error)
}
type wSClient struct {
......@@ -392,12 +471,22 @@ func (x *wSSend2ClientStreamClient) Recv() (*Send2ClientReply, error) {
return m, nil
}
func (c *wSClient) CloseClient(ctx context.Context, in *CloseClientReq, opts ...grpc.CallOption) (*CloseClientReply, error) {
out := new(CloseClientReply)
err := c.cc.Invoke(ctx, "/com.zhigui.xian.websocket.WS/CloseClient", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// WSServer is the server API for WS service.
type WSServer interface {
Send2Client(context.Context, *Send2ClientReq) (*Send2ClientReply, error)
Register(context.Context, *RegisterReq) (*RegisterReply, error)
Send2Server(context.Context, *Send2ServerReq) (*Send2ServerReply, error)
Send2ClientStream(WS_Send2ClientStreamServer) error
CloseClient(context.Context, *CloseClientReq) (*CloseClientReply, error)
}
func RegisterWSServer(s *grpc.Server, srv WSServer) {
......@@ -484,6 +573,24 @@ func (x *wSSend2ClientStreamServer) Recv() (*Send2ClientReq, error) {
return m, nil
}
func _WS_CloseClient_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CloseClientReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WSServer).CloseClient(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/com.zhigui.xian.websocket.WS/CloseClient",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WSServer).CloseClient(ctx, req.(*CloseClientReq))
}
return interceptor(ctx, in, info, handler)
}
var WS_serviceDesc = grpc.ServiceDesc{
ServiceName: "com.zhigui.xian.websocket.WS",
HandlerType: (*WSServer)(nil),
......@@ -500,6 +607,10 @@ var WS_serviceDesc = grpc.ServiceDesc{
MethodName: "Send2Server",
Handler: _WS_Send2Server_Handler,
},
{
MethodName: "CloseClient",
Handler: _WS_CloseClient_Handler,
},
},
Streams: []grpc.StreamDesc{
{
......@@ -512,29 +623,31 @@ var WS_serviceDesc = grpc.ServiceDesc{
Metadata: "proto/ws.proto",
}
func init() { proto.RegisterFile("proto/ws.proto", fileDescriptor_ws_4dcc51ba75c921dd) }
var fileDescriptor_ws_4dcc51ba75c921dd = []byte{
// 321 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0xcf, 0x4e, 0xc2, 0x40,
0x10, 0xc6, 0x59, 0xfe, 0x09, 0x43, 0x44, 0x9d, 0xd3, 0x4a, 0x3c, 0x90, 0x1e, 0x4c, 0x89, 0xc9,
0x6a, 0xf0, 0x11, 0x3c, 0x71, 0x6d, 0x0f, 0xde, 0x4c, 0x4a, 0x77, 0x52, 0x1b, 0x81, 0x96, 0xee,
0x2a, 0xe2, 0x33, 0xf8, 0x20, 0xfa, 0x96, 0xa6, 0x5b, 0x17, 0x4a, 0x82, 0x96, 0xc4, 0x78, 0x9b,
0x99, 0x7e, 0xdf, 0xe6, 0xfb, 0xcd, 0xa4, 0xd0, 0x4f, 0xb3, 0x44, 0x27, 0xd7, 0x2b, 0x25, 0x4c,
0x81, 0xe7, 0x61, 0x32, 0x17, 0x6f, 0x8f, 0x71, 0xf4, 0x1c, 0x8b, 0xd7, 0x38, 0x58, 0x88, 0x15,
0x4d, 0x55, 0x12, 0x3e, 0x91, 0x76, 0x3e, 0x18, 0xf4, 0x7d, 0x5a, 0xc8, 0xf1, 0xdd, 0x2c, 0xa6,
0x85, 0xf6, 0x68, 0x89, 0x03, 0xe8, 0x28, 0xca, 0x5e, 0x28, 0x9b, 0x48, 0xce, 0x86, 0xcc, 0xed,
0x7a, 0x9b, 0x1e, 0x2f, 0xa0, 0x3b, 0x27, 0xa5, 0x82, 0x88, 0x26, 0x92, 0xd7, 0xcd, 0xc7, 0xed,
0x20, 0x77, 0x86, 0xe6, 0x99, 0x89, 0xe4, 0x8d, 0xc2, 0x69, 0x7b, 0x44, 0x68, 0x86, 0x89, 0x24,
0xde, 0x1c, 0x32, 0xb7, 0xe5, 0x99, 0x1a, 0x39, 0x1c, 0x7d, 0x9b, 0x79, 0xcb, 0xc8, 0x6d, 0x9b,
0xab, 0x65, 0xa0, 0x03, 0xde, 0x36, 0x63, 0x53, 0x3b, 0x23, 0xe8, 0x79, 0x14, 0xc5, 0x4a, 0x53,
0x56, 0x11, 0xd3, 0x79, 0xb7, 0x54, 0xbe, 0x99, 0xfc, 0x8d, 0xca, 0x26, 0x6f, 0xec, 0x4f, 0xde,
0xdc, 0x9f, 0xbc, 0x55, 0x4a, 0x8e, 0x70, 0xba, 0xb3, 0xe3, 0x74, 0xb6, 0x76, 0x4e, 0xe0, 0x78,
0x4b, 0x93, 0x0f, 0xac, 0xc8, 0x46, 0x4e, 0x67, 0xeb, 0xf1, 0x67, 0x03, 0xea, 0xf7, 0x3e, 0x46,
0xd0, 0x2b, 0xf9, 0x71, 0x24, 0x7e, 0xbc, 0xa7, 0xd8, 0xbd, 0xe5, 0xe0, 0xea, 0x50, 0x69, 0x9e,
0xa0, 0x86, 0x0f, 0xd0, 0xb1, 0xa1, 0xf0, 0xf2, 0x17, 0x6b, 0xe9, 0x0e, 0x03, 0xf7, 0x20, 0x5d,
0xf1, 0xbe, 0x05, 0x29, 0x18, 0xab, 0x41, 0x36, 0xe7, 0xab, 0x06, 0x29, 0xad, 0xcd, 0xa9, 0xe1,
0x12, 0xce, 0x4a, 0x78, 0xbe, 0xce, 0x28, 0x98, 0xff, 0xdf, 0xde, 0x5c, 0x76, 0xc3, 0xa6, 0x6d,
0xf3, 0xaf, 0xdd, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0xbb, 0x1e, 0x2d, 0x4e, 0x7d, 0x03, 0x00,
0x00,
func init() { proto.RegisterFile("proto/ws.proto", fileDescriptor_ws_5d2e047a08ef9e59) }
var fileDescriptor_ws_5d2e047a08ef9e59 = []byte{
// 358 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xc1, 0x4e, 0xf2, 0x40,
0x10, 0xa6, 0x50, 0xf8, 0x61, 0xc8, 0x8f, 0x3a, 0xa7, 0x95, 0x78, 0x20, 0x3d, 0x98, 0x12, 0x93,
0x6a, 0xf0, 0x11, 0xb8, 0xc8, 0xb5, 0x3d, 0x78, 0x33, 0x29, 0xed, 0xa4, 0x36, 0x16, 0x16, 0xba,
0xab, 0x88, 0xcf, 0xe0, 0xc9, 0xa7, 0xf0, 0x31, 0x4d, 0x17, 0x16, 0xb6, 0x06, 0x81, 0xc4, 0x78,
0x9b, 0x99, 0xce, 0xcc, 0x7e, 0xdf, 0x37, 0x5f, 0x0a, 0x9d, 0x59, 0xce, 0x25, 0xbf, 0x5e, 0x08,
0x4f, 0x05, 0x78, 0x1e, 0xf1, 0x89, 0xf7, 0xf6, 0x98, 0x26, 0xcf, 0xa9, 0xf7, 0x9a, 0x86, 0x53,
0x6f, 0x41, 0x63, 0xc1, 0xa3, 0x27, 0x92, 0xce, 0xa7, 0x05, 0x9d, 0x80, 0xa6, 0xf1, 0x60, 0x98,
0xa5, 0x34, 0x95, 0x3e, 0xcd, 0xb1, 0x0b, 0x4d, 0x41, 0xf9, 0x0b, 0xe5, 0xa3, 0x98, 0x59, 0x3d,
0xcb, 0x6d, 0xf9, 0x9b, 0x1c, 0x2f, 0xa0, 0x35, 0x21, 0x21, 0xc2, 0x84, 0x46, 0x31, 0xab, 0xaa,
0x8f, 0xdb, 0x42, 0x31, 0x19, 0xa9, 0x35, 0xa3, 0x98, 0xd5, 0x56, 0x93, 0x3a, 0x47, 0x04, 0x3b,
0xe2, 0x31, 0x31, 0xbb, 0x67, 0xb9, 0x75, 0x5f, 0xc5, 0xc8, 0xe0, 0xdf, 0x7a, 0x98, 0xd5, 0x55,
0xbb, 0x4e, 0x8b, 0xee, 0x38, 0x94, 0x21, 0x6b, 0xa8, 0xb2, 0x8a, 0x9d, 0x3e, 0xb4, 0x7d, 0x4a,
0x52, 0x21, 0x29, 0x3f, 0x00, 0xd3, 0x79, 0xd7, 0xac, 0x02, 0x55, 0xf9, 0x1d, 0x2b, 0x8d, 0xbc,
0xb6, 0x1b, 0xb9, 0xbd, 0x1b, 0x79, 0xdd, 0x40, 0x7e, 0x07, 0x9d, 0x61, 0xc6, 0x05, 0x1d, 0xa7,
0xb1, 0xa9, 0x62, 0xb5, 0xac, 0xa2, 0x83, 0x70, 0x5a, 0xba, 0xd6, 0x2c, 0x5b, 0x3a, 0x27, 0xf0,
0x7f, 0xab, 0x4b, 0x51, 0xd0, 0x4d, 0x9a, 0xfc, 0xba, 0x56, 0x82, 0x30, 0xcb, 0x96, 0x83, 0x0f,
0x1b, 0xaa, 0xf7, 0x01, 0x26, 0xd0, 0x36, 0x76, 0x62, 0xdf, 0xfb, 0xd1, 0x2d, 0x5e, 0xd9, 0x29,
0xdd, 0xab, 0x63, 0x5b, 0x0b, 0x04, 0x15, 0x7c, 0x80, 0xa6, 0x06, 0x8a, 0x97, 0x7b, 0x46, 0x8d,
0x2b, 0x77, 0xdd, 0xa3, 0xfa, 0x56, 0xfb, 0x35, 0x91, 0x15, 0xef, 0xc3, 0x44, 0x36, 0xe6, 0x38,
0x4c, 0xc4, 0x94, 0xb2, 0x82, 0x73, 0x38, 0x33, 0xe8, 0x05, 0x32, 0xa7, 0x70, 0xf2, 0x77, 0xba,
0xb9, 0xd6, 0x8d, 0x55, 0x70, 0x33, 0xee, 0xb7, 0xf7, 0xb1, 0xb2, 0xd5, 0xf6, 0x3e, 0xf6, 0xdd,
0x12, 0x4e, 0x65, 0xdc, 0x50, 0xbf, 0x8c, 0xdb, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x01, 0xe8,
0xd3, 0x21, 0x44, 0x04, 0x00, 0x00,
}
......@@ -11,6 +11,8 @@ service WS {
}
rpc Send2ClientStream (stream Send2ClientReq) returns (stream Send2ClientReply) {
}
rpc CloseClient (CloseClientReq) returns (CloseClientReply) {
}
}
message Send2ClientReq {
......@@ -34,6 +36,11 @@ message Send2ServerReq {
string data = 5;
}
message CloseClientReq {
string serverId = 1;
string clientId = 2;
}
message Send2ClientReply {
}
......@@ -43,3 +50,6 @@ message RegisterReply {
message Send2ServerReply{
}
message CloseClientReply {
}
......@@ -19,19 +19,32 @@ func (ws *WsHandler) Send2Client(ctx context.Context, req *wsproto.Send2ClientRe
return nil, errors.New("clientId不能为空")
}
// 获取连接client对象
conn, err := wsserver.Manager.GetByClientId(req.ClientId)
// 通过clientId解析服务ip
_, _, _, isLocal, err := wsserver.GetAddrInfoAndIsLocal(req.ClientId)
if err != nil {
openlog.Error("消息发送异常:" + err.Error())
openlog.Error(err.Error())
return nil, err
}
if conn != nil {
// 发送消息到指定客户端
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
//如果是本机则发送到本机
if isLocal {
// 获取连接client对象
conn, err := wsserver.Manager.GetByClientId(req.ClientId)
if err != nil {
openlog.Error("消息发送异常:" + err.Error())
return nil, err
}
if conn != nil {
// 发送消息到指定客户端
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
} else {
return nil, errors.New("websocket conn连接异常")
}
} else {
return nil, errors.New("websocket conn连接异常")
//转发信息到指定的机器
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
}
return &wsproto.Send2ClientReply{}, nil
......@@ -50,19 +63,30 @@ func (ws *WsHandler) Send2ClientStream(stream wsproto.WS_Send2ClientStreamServer
return errors.New("clientId不能为空")
}
// 获取连接client对象
conn, err := wsserver.Manager.GetByClientId(req.ClientId)
// 通过clientId解析服务ip
_, _, _, isLocal, err := wsserver.GetAddrInfoAndIsLocal(req.ClientId)
if err != nil {
openlog.Error("消息发送异常:" + err.Error())
openlog.Error(err.Error())
return err
}
if conn != nil {
// 发送消息到指定客户端
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
//如果是本机则发送到本机
if isLocal {
// 获取连接client对象
conn, err := wsserver.Manager.GetByClientId(req.ClientId)
if err != nil {
openlog.Error("消息发送异常:" + err.Error())
return err
}
if conn != nil {
// 发送消息到指定客户端
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
} else {
return errors.New("websocket conn连接异常")
}
} else {
return errors.New("websocket conn连接异常")
wsserver.SendMessage2Client(req.MessageId, req.ClientId, int(req.Code), req.Message, &req.Data)
}
err = stream.Send(&wsproto.Send2ClientReply{})
......@@ -70,7 +94,6 @@ func (ws *WsHandler) Send2ClientStream(stream wsproto.WS_Send2ClientStreamServer
openlog.Error(fmt.Sprintf("stream send err: %v", err))
return err
}
}
}
......@@ -97,21 +120,58 @@ func (ws *WsHandler) Send2Server(ctx context.Context, req *wsproto.Send2ServerRe
fmt.Println(clients)
for _, v := range clients {
conn, err := wsserver.Manager.GetByClientId(v)
// 通过clientId解析服务ip
_, _, _, isLocal, err := wsserver.GetAddrInfoAndIsLocal(v)
if err != nil {
openlog.Error("消息发送异常:" + err.Error())
openlog.Error(err.Error())
return nil, err
}
if conn != nil {
// 发送消息到指定客户端
fmt.Println("Send2Server:", req.Data)
wsserver.SendMessage2Client(req.MessageId, v, int(req.Code), req.Message, &req.Data)
//如果是本机则发送到本机
if isLocal {
conn, err := wsserver.Manager.GetByClientId(v)
if err != nil {
openlog.Error("Send2Server err:" + err.Error())
return nil, err
}
if conn != nil {
// 发送消息到指定客户端
fmt.Println("Send2Server:", req.Data)
wsserver.SendMessage2Client(req.MessageId, v, int(req.Code), req.Message, &req.Data)
} else {
return nil, errors.New("websocket conn连接异常")
}
} else {
return nil, errors.New("websocket conn连接异常")
wsserver.SendMessage2Client(req.MessageId, v, int(req.Code), req.Message, &req.Data)
}
}
return &wsproto.Send2ServerReply{}, nil
}
func (ws *WsHandler) CloseClient(ctx context.Context, req *wsproto.CloseClientReq) (*wsproto.CloseClientReply, error) {
fmt.Println("CloseClient request:", req.ServerId)
if len(req.ClientId) == 0 || len(req.ServerId) == 0 {
return nil, errors.New("clientId or ServerId不能为空")
}
// 获取连接client对象
conn, err := wsserver.Manager.GetByClientId(req.ClientId)
if err != nil {
openlog.Error("CloseClient err:" + err.Error())
return nil, err
}
if conn != nil {
wsserver.CloseLocalClient(req.ClientId, req.ServerId)
} else {
return nil, errors.New("websocket conn连接异常")
}
return &wsproto.CloseClientReply{}, nil
}
......@@ -102,10 +102,8 @@ func NewClientManager() (clientManager *ClientManager) {
// 管道处理程序
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.Connect:
fmt.Println("用户连接进来了")
// 建立连接事件
......@@ -124,7 +122,7 @@ func (manager *ClientManager) EventConnect(client *Client) {
fmt.Println("clientId:", client.ClientId)
}
// 断开连接时间
// 断开连接事件
func (manager *ClientManager) EventDisconnect(client *Client) {
//关闭连接
_ = client.Socket.Close()
......
package wsserver
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
)
func Encrypt(rawData, key []byte) (string, error) {
data, err := aesCBCEncrypt(rawData, key)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
}
func Decrypt(rawData string, key []byte) (string, error) {
data, err := base64.StdEncoding.DecodeString(rawData)
if err != nil {
return "", err
}
dnData, err := aesCBCDncrypt(data, key)
if err != nil {
return "", err
}
return string(dnData), nil
}
func pKCS7Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padtext...)
}
func pKCS7UnPadding(origData []byte) ([]byte, error) {
length := len(origData)
unpadding := int(origData[length-1])
if length-unpadding < 0 || length-unpadding > len(origData) {
return nil, errors.New("unPadding error")
}
return origData[:(length - unpadding)], nil
}
//AES加密
func aesCBCEncrypt(rawData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return []byte{}, err
}
//填充原文
blockSize := block.BlockSize()
rawData = pKCS7Padding(rawData, blockSize)
//初始向量IV必须是唯一,但不需要保密
cipherText := make([]byte, blockSize+len(rawData))
//block大小 16
iv := cipherText[:blockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return []byte{}, err
}
//block大小和初始向量大小一定要一致
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(cipherText[blockSize:], rawData)
return cipherText, nil
}
//AES解密
func aesCBCDncrypt(encryptData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return []byte{}, err
}
blockSize := block.BlockSize()
if len(encryptData) < blockSize {
return []byte{}, errors.New("ciphertext too short")
}
iv := encryptData[:blockSize]
encryptData = encryptData[blockSize:]
if len(encryptData)%blockSize != 0 {
return []byte{}, errors.New("ciphertext is not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(encryptData, encryptData)
//解填充
encryptData, err = pKCS7UnPadding(encryptData)
return encryptData, err
}
package wsserver
import (
"context"
"fmt"
pb "gateway-codec/proto"
"github.com/go-chassis/openlog"
"google.golang.org/grpc"
)
func grpcConn(addr string) *grpc.ClientConn {
fmt.Println("grpcConn:", addr)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
openlog.Error(fmt.Sprintf("did not connect: %v", err))
}
return conn
}
// 给指定的机器发送信息
func SendRpc2Client(addr string, messageId, clientId string, code int, message string, data *string) {
conn := grpcConn(addr)
defer conn.Close()
c := pb.NewWSClient(conn)
_, err := c.Send2Client(context.Background(), &pb.Send2ClientReq{
MessageId: messageId,
ClientId: clientId,
Code: int32(code),
Message: message,
Data: *data,
})
if err != nil {
openlog.Error(fmt.Sprintf("failed to call: %v", err))
}
}
func CloseRpcClient(addr string, clientId, serverId string) {
conn := grpcConn(addr)
defer conn.Close()
c := pb.NewWSClient(conn)
_, err := c.CloseClient(context.Background(), &pb.CloseClientReq{
ServerId: serverId,
ClientId: clientId,
})
if err != nil {
openlog.Error(fmt.Sprintf("failed to call: %v", err))
}
}
......@@ -21,7 +21,6 @@ type accountInfo struct {
RegisterTime int64 `json:"registerTime"`
}
//TODO 是否存db, mysql, redis, etcd
var ServerMap sync.Map
//Register 注册系统到websocket服务
......@@ -54,10 +53,10 @@ func Register(serverId string) error {
//注册
err = etcd.Put(ETCD_PREFIX+serverId, string(jsonBytes))
if err != nil {
panic(err)
return err
}
case SERVER_REDIS:
// TODO 注册到redis
fmt.Println("redis etcd")
default:
if _, ok := ServerMap.Load(serverId); ok {
......
package wsserver
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"io"
"gateway-codec/config"
"strings"
archaius "github.com/go-chassis/go-archaius"
"github.com/go-chassis/openlog"
uuid "github.com/satori/go.uuid"
)
......@@ -25,105 +21,60 @@ func GenUUID() string {
return string(uuidByt[8:24])
}
//TODO clientId生成规则
func GenClientId() string {
raw := []byte(GenUUID())
str, err := Encrypt(raw, []byte(GenUUID()))
//TODO clientId生成规则, 本机ip+端口+servicename+uuid
func GenClientId() (string, error) {
raw := []byte(config.GlobalEnv.LocalHost + ":" + config.GlobalEnv.RPCPort + ":" + GenUUID())
str, err := Encrypt(raw, []byte(config.GlobalEnv.CryptoKey))
if err != nil {
panic(err)
openlog.Error(fmt.Sprintf("Encrypt err:%s", err))
return "", err
}
return str
return str, nil
}
func Encrypt(rawData, key []byte) (string, error) {
data, err := aesCBCEncrypt(rawData, key)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
func WsServerEnv() string {
return archaius.GetString("servicecomb.websocket.server.register", "local")
}
func Decrypt(rawData string, key []byte) (string, error) {
data, err := base64.StdEncoding.DecodeString(rawData)
if err != nil {
return "", err
func ParseRpcAddrValue(value string) (host string, port string, err error) {
if value == "" {
openlog.Error("ParseRpcAddrValue err value:" + value)
err = errors.New("解析地址错误")
return
}
dnData, err := aesCBCDncrypt(data, key)
if err != nil {
return "", err
addr := strings.Split(value, ":")
if len(addr) != 3 {
err = errors.New("解析地址错误")
return
}
return string(dnData), nil
}
host, port = addr[0], addr[1]
func pKCS7Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padtext...)
return
}
func pKCS7UnPadding(origData []byte) ([]byte, error) {
length := len(origData)
unpadding := int(origData[length-1])
if length-unpadding < 0 || length-unpadding > len(origData) {
return nil, errors.New("unPadding error")
}
return origData[:(length - unpadding)], nil
//判断地址是否为本机
func IsAddrLocal(host string, port string) bool {
return host == config.GlobalEnv.LocalHost && port == config.GlobalEnv.RPCPort
}
//AES加密
func aesCBCEncrypt(rawData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
//获取client key地址信息
func GetAddrInfoAndIsLocal(clientId string) (addr string, host string, port string, isLocal bool, err error) {
//解密ClientId
addr, err = Decrypt(clientId, []byte(config.GlobalEnv.CryptoKey))
if err != nil {
return []byte{}, err
}
//填充原文
blockSize := block.BlockSize()
rawData = pKCS7Padding(rawData, blockSize)
//初始向量IV必须是唯一,但不需要保密
cipherText := make([]byte, blockSize+len(rawData))
//block大小 16
iv := cipherText[:blockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return []byte{}, err
return
}
//block大小和初始向量大小一定要一致
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(cipherText[blockSize:], rawData)
return cipherText, nil
}
//AES解密
func aesCBCDncrypt(encryptData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
host, port, err = ParseRpcAddrValue(addr)
if err != nil {
return []byte{}, err
return
}
blockSize := block.BlockSize()
if len(encryptData) < blockSize {
return []byte{}, errors.New("ciphertext too short")
}
iv := encryptData[:blockSize]
encryptData = encryptData[blockSize:]
if len(encryptData)%blockSize != 0 {
return []byte{}, errors.New("ciphertext is not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(encryptData, encryptData)
//解填充
encryptData, err = pKCS7UnPadding(encryptData)
return encryptData, err
isLocal = IsAddrLocal(host, port)
return
}
func WsServerEnv() string {
return archaius.GetString("WS_SERVER_REGISTER", "local")
func GetRpcPort() string {
return ""
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment