feat: SDWAN data plane + UDP punch port fix + TUN reader

SDWAN:
- protocol: add SDWANConfig/SDWANPeer/SDWANPacket structs, MsgTunnel type
- server: sdwan.go (JSON file store), sdwan_api.go (Get/Set/broadcast/route)
- server: push SDWAN config on login, announce peer online/offline events
- server: RouteSDWANPacket routes TUN packets between nodes via signaling
- client: TUN device setup (optun), tunReadLoop reads IP packets
- client: handle SDWANConfig/SDWANPeer/SDWANDel push messages
- client: apply routes (per-node /32 + broad CIDR fallback)

UDP punch fix:
- nat/detect: capture LocalPort from STUN UDP socket for punch binding
- client: pass publicPort + localPort through login and punch config
- coordinator: include PublicPort in PunchParams for both sides
- protocol: add PublicPort to LoginReq and ReportBasic

Other:
- server: use client-reported PublicIP instead of raw r.RemoteAddr
- server: update PublicIP/Port from ReportBasic if provided
- client: config file loading with zero-value defaults backfill
- .gitignore: exclude run/, *.pid, *.log, sdwan.json
- go.mod: add golang.org/x/sys for TUN ioctl
This commit is contained in:
2026-03-02 17:48:05 +08:00
parent 673e354fe5
commit 5568ea67d9
12 changed files with 680 additions and 37 deletions

View File

@@ -5,12 +5,18 @@ import (
"crypto/tls"
"fmt"
"log"
"net"
"net/netip"
"net/url"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"time"
"golang.org/x/sys/unix"
"github.com/gorilla/websocket"
"github.com/openp2p-cn/inp2p/pkg/auth"
"github.com/openp2p-cn/inp2p/pkg/config"
@@ -24,24 +30,35 @@ import (
// Client is the INP2P client node.
type Client struct {
cfg config.ClientConfig
conn *signal.Conn
natType protocol.NATType
publicIP string
tunnels map[string]*tunnel.Tunnel // peerNode → tunnel
tMu sync.RWMutex
relayMgr *relay.Manager
quit chan struct{}
wg sync.WaitGroup
cfg config.ClientConfig
conn *signal.Conn
natType protocol.NATType
publicIP string
publicPort int
localPort int
tunnels map[string]*tunnel.Tunnel // peerNode → tunnel
tMu sync.RWMutex
relayMgr *relay.Manager
sdwanMu sync.RWMutex
sdwan protocol.SDWANConfig
sdwanIP string
sdwanStop chan struct{}
tunMu sync.Mutex
tunFile *os.File
quit chan struct{}
wg sync.WaitGroup
}
// New creates a new client.
func New(cfg config.ClientConfig) *Client {
c := &Client{
cfg: cfg,
natType: protocol.NATUnknown,
tunnels: make(map[string]*tunnel.Tunnel),
quit: make(chan struct{}),
cfg: cfg,
natType: protocol.NATUnknown,
tunnels: make(map[string]*tunnel.Tunnel),
sdwanStop: make(chan struct{}),
quit: make(chan struct{}),
publicPort: 0,
localPort: 0,
}
if cfg.RelayEnabled {
@@ -76,7 +93,9 @@ func (c *Client) connectAndRun() error {
)
c.natType = natResult.Type
c.publicIP = natResult.PublicIP
log.Printf("[client] NAT type=%s, publicIP=%s", c.natType, c.publicIP)
c.publicPort = natResult.Port1
c.localPort = natResult.LocalPort
log.Printf("[client] NAT type=%s, publicIP=%s, publicPort=%d, localPort=%d", c.natType, c.publicIP, c.publicPort, c.localPort)
// 2. WSS Connect
scheme := "ws"
@@ -114,6 +133,7 @@ func (c *Client) connectAndRun() error {
RelayEnabled: c.cfg.RelayEnabled,
SuperRelay: c.cfg.SuperRelay,
PublicIP: c.publicIP,
PublicPort: c.publicPort,
}
rspData, err := c.conn.Request(
@@ -166,10 +186,12 @@ func (c *Client) connectAndRun() error {
func (c *Client) sendReportBasic() {
hostname, _ := os.Hostname()
report := protocol.ReportBasic{
OS: runtime.GOOS,
LanIP: getLocalIP(),
Version: config.Version,
HasIPv4: 1,
OS: runtime.GOOS,
LanIP: getLocalIP(),
Version: config.Version,
HasIPv4: 1,
PublicIP: c.publicIP,
PublicPort: c.publicPort,
}
_ = hostname // for future use
c.conn.Write(protocol.MsgReport, protocol.SubReportBasic, report)
@@ -203,6 +225,70 @@ func (c *Client) registerHandlers() {
return nil
})
// Handle SDWAN config push
c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANConfig, func(data []byte) error {
var cfg protocol.SDWANConfig
if err := protocol.DecodePayload(data, &cfg); err != nil {
return err
}
if cfg.GatewayCIDR == "" {
return nil
}
log.Printf("[client] sdwan config received: gateway=%s nodes=%d mode=%s", cfg.GatewayCIDR, len(cfg.Nodes), cfg.Mode)
_ = os.WriteFile("sdwan.json", data[protocol.HeaderSize:], 0644)
// apply control+data plane
if err := c.applySDWAN(cfg); err != nil {
log.Printf("[client] sdwan apply failed: %v", err)
}
return nil
})
// SDWAN peer online/update event
c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANPeer, func(data []byte) error {
var p protocol.SDWANPeer
if err := protocol.DecodePayload(data, &p); err != nil {
return err
}
if p.Node == "" || p.Node == c.cfg.Node || p.IP == "" {
return nil
}
_ = runCmd("ip", "route", "replace", p.IP+"/32", "dev", "optun")
return nil
})
// SDWAN peer offline/delete event
c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANDel, func(data []byte) error {
var p protocol.SDWANPeer
if err := protocol.DecodePayload(data, &p); err != nil {
return err
}
if p.IP != "" {
_ = runCmd("ip", "route", "del", p.IP+"/32", "dev", "optun")
}
if p.Node != "" {
c.tMu.Lock()
if t, ok := c.tunnels[p.Node]; ok {
t.Close()
delete(c.tunnels, p.Node)
}
c.tMu.Unlock()
}
return nil
})
// SDWAN packet from server, inject to local TUN
c.conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error {
var pkt protocol.SDWANPacket
if err := protocol.DecodePayload(data, &pkt); err != nil {
return err
}
if len(pkt.Payload) == 0 {
return nil
}
return c.writeTUN(pkt.Payload)
})
// Handle edit app push
c.conn.OnMessage(protocol.MsgPush, protocol.SubPushEditApp, func(data []byte) error {
var app protocol.AppConfig
@@ -315,6 +401,7 @@ func (c *Client) connectApp(app config.AppConfig) {
PeerPort: rsp.Peer.Port,
PeerNAT: rsp.Peer.NATType,
SelfNAT: c.natType,
SelfPort: c.localPort,
IsInitiator: true,
})
@@ -340,7 +427,7 @@ func (c *Client) connectApp(app config.AppConfig) {
c.reportConnect(app, protocol.ReportConnect{
PeerNode: app.PeerNode, LinkMode: result.Mode,
RTT: int(result.RTT.Milliseconds()),
RTT: int(result.RTT.Milliseconds()),
NATType: c.natType, PeerNATType: rsp.Peer.NATType,
})
@@ -405,6 +492,7 @@ func (c *Client) handlePunchRequest(req protocol.ConnectReq) {
PeerPort: req.Peer.Port,
PeerNAT: req.Peer.NATType,
SelfNAT: c.natType,
SelfPort: c.localPort,
IsInitiator: false,
})
@@ -444,6 +532,161 @@ func (c *Client) reportConnect(app config.AppConfig, rc protocol.ReportConnect)
c.conn.Write(protocol.MsgReport, protocol.SubReportConnect, rc)
}
func (c *Client) applySDWAN(cfg protocol.SDWANConfig) error {
selfIP := ""
for _, n := range cfg.Nodes {
if n.Node == c.cfg.Node {
selfIP = strings.TrimSpace(n.IP)
break
}
}
if selfIP == "" {
return fmt.Errorf("node %s not found in sdwan nodes", c.cfg.Node)
}
if err := runCmd("ip", "tuntap", "add", "dev", "optun", "mode", "tun"); err != nil {
if !(strings.Contains(err.Error(), "File exists") || strings.Contains(err.Error(), "Device or resource busy")) {
return err
}
}
_ = runCmd("ip", "link", "set", "dev", "optun", "mtu", "1420")
if err := runCmd("ip", "addr", "replace", fmt.Sprintf("%s/32", selfIP), "dev", "optun"); err != nil {
return err
}
if err := runCmd("ip", "link", "set", "dev", "optun", "up"); err != nil {
return err
}
pfx, err := netip.ParsePrefix(cfg.GatewayCIDR)
if err != nil {
return fmt.Errorf("invalid gateway cidr: %s", cfg.GatewayCIDR)
}
// prefer /32 host routes for full-mesh precision
for _, n := range cfg.Nodes {
ip := strings.TrimSpace(n.IP)
if ip == "" || ip == selfIP {
continue
}
_ = runCmd("ip", "route", "replace", ip+"/32", "dev", "optun")
}
// fallback broad route for hub mode / compatibility
if err := runCmd("ip", "route", "replace", pfx.String(), "dev", "optun"); err != nil {
return err
}
c.sdwanMu.Lock()
c.sdwan = cfg
c.sdwanIP = selfIP
c.sdwanMu.Unlock()
if err := c.ensureTUNReader(); err != nil {
return err
}
log.Printf("[client] sdwan applied: optun=%s route=%s dev optun", selfIP, pfx.String())
return nil
}
func (c *Client) ensureTUNReader() error {
c.tunMu.Lock()
defer c.tunMu.Unlock()
if c.tunFile != nil {
return nil
}
f, err := os.OpenFile("/dev/net/tun", os.O_RDWR, 0)
if err != nil {
return err
}
ifr, err := unix.NewIfreq("optun")
if err != nil {
f.Close()
return err
}
ifr.SetUint16(unix.IFF_TUN | unix.IFF_NO_PI)
if err := unix.IoctlIfreq(int(f.Fd()), unix.TUNSETIFF, ifr); err != nil {
f.Close()
return err
}
c.tunFile = f
c.wg.Add(1)
go c.tunReadLoop()
return nil
}
func (c *Client) tunReadLoop() {
defer c.wg.Done()
buf := make([]byte, 65535)
for {
select {
case <-c.quit:
return
default:
}
c.tunMu.Lock()
f := c.tunFile
c.tunMu.Unlock()
if f == nil {
return
}
n, err := f.Read(buf)
if err != nil {
if c.IsStopping() {
return
}
time.Sleep(100 * time.Millisecond)
continue
}
if n < 20 {
continue
}
pkt := buf[:n]
version := pkt[0] >> 4
if version != 4 {
continue
}
dstIP := net.IP(pkt[16:20]).String()
srcIP := net.IP(pkt[12:16]).String()
c.sdwanMu.RLock()
self := c.sdwanIP
c.sdwanMu.RUnlock()
if dstIP == self {
continue
}
_ = c.conn.Write(protocol.MsgTunnel, protocol.SubTunnelSDWANData, protocol.SDWANPacket{
SrcIP: srcIP,
DstIP: dstIP,
Payload: append([]byte(nil), pkt...),
})
}
}
func (c *Client) writeTUN(payload []byte) error {
c.tunMu.Lock()
f := c.tunFile
c.tunMu.Unlock()
if f == nil {
return nil
}
_, err := f.Write(payload)
return err
}
func (c *Client) IsStopping() bool {
select {
case <-c.quit:
return true
default:
return false
}
}
func runCmd(name string, args ...string) error {
cmd := exec.Command(name, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s %v: %w: %s", name, args, err, strings.TrimSpace(string(out)))
}
return nil
}
// Stop shuts down the client.
func (c *Client) Stop() {
close(c.quit)
@@ -458,6 +701,12 @@ func (c *Client) Stop() {
t.Close()
}
c.tMu.Unlock()
c.tunMu.Lock()
if c.tunFile != nil {
_ = c.tunFile.Close()
c.tunFile = nil
}
c.tunMu.Unlock()
c.wg.Wait()
}