这里测试最基本的,服务发现的那种需要魔法,我的虚拟机不行,示例去官网的example查看
p2p.go
go get 安装缺失的库
package main
import (
"bufio"
"context"
"crypto/rand"
"flag"
"fmt"
"io"
"log"
mrand "math/rand"
"os"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
)
func handleStream(s network.Stream) {
log.Println("Got a new stream!")
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
}
func readData(rw *bufio.ReadWriter) {
for {
str, _ := rw.ReadString('\n')
if str == "" {
return
}
if str != "\n" {
//彩色打印
fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
}
}
}
func writeData(rw *bufio.ReadWriter) {
stdReader := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Println(err)
return
}
rw.WriteString(fmt.Sprintf("%s\n", sendData))
rw.Flush()
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sourcePort := flag.Int("sp", 0, "要开启的端口")
dest := flag.String("d", "", "要连接的目的地")
help := flag.Bool("help", false, "帮助")
debug := flag.Bool("debug", false, "Debug")
flag.Parse()
if *help {
fmt.Println("A服务器启动: ./chat -sp <SOURCE_PORT> <SOURCE_PORT>是端口数字")
fmt.Println("B服务器启动: ./chat -d <MULTIADDR> <MULTIADDR>是A服务启动后返回的字符串")
os.Exit(0)
}
var r io.Reader
if *debug {
//随机端口
r = mrand.New(mrand.NewSource(int64(*sourcePort)))
} else {
r = rand.Reader
}
h, err := makeHost(*sourcePort, r)
if err != nil {
log.Println(err)
return
}
if *dest == "" {
startPeer(ctx, h, handleStream)
} else {
rw, err := startPeerAndConnect(ctx, h, *dest)
if err != nil {
log.Println(err)
return
}
// 启动协程监听读写
go writeData(rw)
go readData(rw)
}
select {}
}
func makeHost(port int, randomness io.Reader) (host.Host, error) {
// 证书
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness)
if err != nil {
log.Println(err)
return nil, err
}
// 0.0.0.0 监听在任何网卡
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
return libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
}
func startPeer(ctx context.Context, h host.Host, streamHandler network.StreamHandler) {
// 设置一个函数处理流
// 只在接收端生效
h.SetStreamHandler("/chat/1.0.0", streamHandler)
var port string
for _, la := range h.Network().ListenAddresses() {
if p, err := la.ValueForProtocol(multiaddr.P_TCP); err == nil {
port = p
break
}
}
if port == "" {
log.Println("端口不可位空")
return
}
log.Printf("在另外一台机器上启动命令 './chat -d /ip4/127.0.0.1/tcp/%v/p2p/%s' \n", port, h.ID())
log.Println("你可以替换 127.0.0.1 作为公网ip")
log.Println("等待连接.....")
log.Println()
}
func startPeerAndConnect(ctx context.Context, h host.Host, destination string) (*bufio.ReadWriter, error) {
for _, la := range h.Addrs() {
log.Printf(" - %v\n", la)
}
log.Println()
maddr, err := multiaddr.NewMultiaddr(destination)
if err != nil {
log.Println(err)
return nil, err
}
info, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Println(err)
return nil, err
}
h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
s, err := h.NewStream(context.Background(), info.ID, "/chat/1.0.0")
if err != nil {
log.Println(err)
return nil, err
}
log.Println("Established connection to destination")
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
return rw, nil
}
使用方式
A服务器启动: ./chat -sp <SOURCE_PORT> <SOURCE_PORT>是端口数字
B服务器启动: ./chat -d <MULTIADDR> <MULTIADDR>是A服务启动后返回的字符串