随着业务的发展,线上部署肯定不会是一台服务器,当多个服务器的时候,前面的程序是无法满足的,当一个用户连接到主机A,而另一个用户连接到主机B,这两个用户又如何通信呢?
本篇我们来解决这个问题,我们通过在生成的连接标识到带有相关信息和转发机制来实现,代码如下:
package main /* * 支持分布式 */ import ( "bufio" "encoding/json" "flag" "fmt" "math/rand" "net" "strings" "sync" "time" ) //保存连接 var connList sync.Map var Ip, Port string //消息结构{"from":"aaa","to":"bbb","body":"aaa","cmd":"sendMsg"} type UserMsg struct { From string `json:"from"` To string `json:"to"` Body string `json:"body"` Cmd string `json:"cmd"` } //随机字符串 func randName() string { var letterRunes = []rune("123456789") rand.Seed(time.Now().UnixNano()) b := make([]rune, 6) for i := range b { b[i] = letterRunes[rand.Intn(len(letterRunes))] } return Ip + ":" + Port + ":" + string(b) } //得到本机IP func getLocalIpV4() string { inters, err := net.Interfaces() if err != nil { panic(err) } for _, inter := range inters { // 判断网卡是否开启,过滤本地环回接口 if inter.Flags&net.FlagUp != 0 && !strings.HasPrefix(inter.Name, "lo") { // 获取网卡下所有的地址 addrs, err := inter.Addrs() if err != nil { continue } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { //判断是否存在IPV4 IP 如果没有过滤 if ipnet.IP.To4() != nil { return ipnet.IP.String() } } } } } return "" } //转发消息 func forwardMsg(ip, port, msg string) { conn, _ := net.Dial("tcp", ip+":"+port) defer conn.Close() conn.Write([]byte(msg)) time.Sleep(time.Second) } func DealConn(conn net.Conn) { // 处理完关闭连接 defer conn.Close() fmt.Println("new conn\n") name := randName() connList.Store(name, conn) conn.Write([]byte("you is : " + name + "\r\n")) // 针对当前连接做发送和接受操作 for { reader := bufio.NewReader(conn) // 读取字符串, 直到碰到回车返回 str, err := reader.ReadString('\n') // 数据读取正确 if err == nil { // 去掉字符串尾部的回车 str = strings.TrimSpace(str) var strStruct UserMsg err := json.Unmarshal([]byte(str), &strStruct) if err != nil { fmt.Printf("msg json error, err:%v\n", err) } else { to := strStruct.To cmd := strStruct.Cmd if cmd == "sendMsg" { toArr := strings.Split(to, ":") if toArr[0] == Ip && toArr[1] == Port { toUser, ok := connList.Load(to) if !ok { fmt.Println("to user is not\n") } else { toUserConn, ok := toUser.(net.Conn) if !ok { fmt.Println("to user type wrong\n") } else { toUserConn.Write([]byte(strStruct.From + " say : " + strStruct.Body + "\r\n")) } } } else { forwardMsg(toArr[0], toArr[1], str+"\n") fmt.Println("forwardMsg " + toArr[0] + toArr[1] + str) } } else { fmt.Println("unkonw msg") } } } else { //删除保存的连接 connList.Delete(name) fmt.Println("conn close\n") break } } } func main() { flag.StringVar(&Port, "Port", "1215", "端口") flag.Parse() Ip = getLocalIpV4() // 建立tcp 服务 listen, err := net.Listen("tcp", Ip+":"+Port) if err != nil { fmt.Printf("listen failed, err:%v\n", err) return } fmt.Println("listen " + Ip + ":" + Port) for { // 等待客户端建立连接 fmt.Printf("accept conn ...... \n") conn, err := listen.Accept() if err != nil { fmt.Printf("accept failed, err:%v\n", err) continue } // 启动一个单独的 goroutine 去处理连接 go DealConn(conn) } }
这是服务端代码,我们发消息的格式如下:
{"from":"127.0.0.1:1216:123456","to":"127.0.0.1:1215:651444","body":"1234567","cmd":"sendMsg"}
这样我们就可能实现一个可以分布属部署的程序了,不过还有一种简单且使用较多的分布式方式,那就是利用redis的订阅发布机制,客户端通过socket服务来订阅redis来实现消息的收发,这里不做详细介绍,感兴趣的同学可以尝试做一下!
还可以利用redis的发布订阅来实现消息传递进而实现分布式部署