// Package server implements the inp2ps signaling server. package server import ( "fmt" "log" "net" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/openp2p-cn/inp2p/internal/store" "github.com/openp2p-cn/inp2p/pkg/auth" "github.com/openp2p-cn/inp2p/pkg/config" "github.com/openp2p-cn/inp2p/pkg/protocol" "github.com/openp2p-cn/inp2p/pkg/signal" ) // NodeInfo represents a connected client node. type NodeInfo struct { Name string `json:"name"` Token uint64 `json:"-"` TenantID int64 `json:"tenantId"` User string `json:"user"` Version string `json:"version"` NATType protocol.NATType `json:"natType"` PublicIP string `json:"publicIP"` PublicPort int `json:"publicPort"` LanIP string `json:"lanIP"` OS string `json:"os"` Mac string `json:"mac"` ShareBandwidth int `json:"shareBandwidth"` RelayEnabled bool `json:"relayEnabled"` SuperRelay bool `json:"superRelay"` HasIPv4 int `json:"hasIPv4"` IPv6 string `json:"ipv6"` LoginTime time.Time `json:"loginTime"` LastHeartbeat time.Time `json:"lastHeartbeat"` Conn *signal.Conn `json:"-"` Apps []protocol.AppConfig `json:"apps"` mu sync.RWMutex `json:"-"` } // IsOnline checks if node has sent heartbeat recently. func (n *NodeInfo) IsOnline() bool { n.mu.RLock() defer n.mu.RUnlock() return time.Since(n.LastHeartbeat) < time.Duration(config.HeartbeatTimeout)*time.Second } // Server is the INP2P signaling server. type Server struct { cfg config.ServerConfig nodes map[string]*NodeInfo mu sync.RWMutex upgrader websocket.Upgrader quit chan struct{} sdwanPath string sdwan *sdwanStore store *store.Store tokens map[uint64]bool } func (s *Server) Store() *store.Store { return s.store } // New creates a new server. func New(cfg config.ServerConfig) *Server { sdwanPath := "/root/.openclaw/workspace/inp2p/sdwan.json" tokens := make(map[uint64]bool) if cfg.Token != 0 { tokens[cfg.Token] = true } for _, t := range cfg.Tokens { tokens[t] = true } st, err := store.Open(cfg.DBPath) if err != nil { log.Printf("[server] open store failed: %v", err) } else { // bootstrap default admin/admin in tenant 1 if _, gErr := st.GetTenantByID(1); gErr != nil { if _, _, _, cErr := st.CreateTenantWithUsers("default", "admin", "admin"); cErr != nil { log.Printf("[server] bootstrap default tenant failed: %v", cErr) } else { log.Printf("[server] bootstrap default tenant created (tenant=1, admin/admin)") } } } return &Server{ cfg: cfg, nodes: make(map[string]*NodeInfo), sdwanPath: sdwanPath, sdwan: newSDWANStore(sdwanPath), store: st, tokens: tokens, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, ReadBufferSize: 4096, WriteBufferSize: 4096, }, quit: make(chan struct{}), } } // GetNode returns a connected node by name. func (s *Server) GetNode(name string) *NodeInfo { s.mu.RLock() defer s.mu.RUnlock() return s.nodes[name] } // GetOnlineNodes returns all online nodes. func (s *Server) GetOnlineNodes() []*NodeInfo { s.mu.RLock() defer s.mu.RUnlock() var out []*NodeInfo for _, n := range s.nodes { if n.IsOnline() { out = append(out, n) } } return out } // GetNodeForUser returns node if token matches (legacy) or tenant matches. func (s *Server) GetNodeForUser(name string, token uint64) *NodeInfo { s.mu.RLock() defer s.mu.RUnlock() n := s.nodes[name] if n == nil { return nil } if n.Token != token && n.TenantID == 0 { return nil } return n } func (s *Server) GetNodeForTenant(name string, tenantID int64) *NodeInfo { s.mu.RLock() defer s.mu.RUnlock() n := s.nodes[name] if n == nil || n.TenantID != tenantID { return nil } return n } func (s *Server) GetOnlineNodesByTenant(tenantID int64) []*NodeInfo { s.mu.RLock() defer s.mu.RUnlock() var out []*NodeInfo for _, n := range s.nodes { if n.IsOnline() && n.TenantID == tenantID { out = append(out, n) } } return out } // GetRelayNodes returns nodes that can serve as relay. // Priority: same-user private relay → super relay func (s *Server) GetRelayNodes(forUser string, excludeNodes ...string) []*NodeInfo { excludeSet := make(map[string]bool) for _, n := range excludeNodes { excludeSet[n] = true } s.mu.RLock() defer s.mu.RUnlock() var privateRelays, superRelays []*NodeInfo for _, n := range s.nodes { if !n.IsOnline() || excludeSet[n.Name] || !n.RelayEnabled { continue } if n.User == forUser { privateRelays = append(privateRelays, n) } else if n.SuperRelay { superRelays = append(superRelays, n) } } // private first, then super return append(privateRelays, superRelays...) } // GetRelayNodesByTenant returns relay nodes within tenant. func (s *Server) GetRelayNodesByTenant(tenantID int64, excludeNodes ...string) []*NodeInfo { excludeSet := make(map[string]bool) for _, n := range excludeNodes { excludeSet[n] = true } s.mu.RLock() defer s.mu.RUnlock() var relays []*NodeInfo for _, n := range s.nodes { if !n.IsOnline() || excludeSet[n.Name] { continue } if n.TenantID == tenantID && (n.RelayEnabled || n.SuperRelay) { relays = append(relays, n) } } return relays } // HandleWS is the WebSocket handler for client connections. func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { ws, err := s.upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("[server] ws upgrade error: %v", err) return } conn := signal.NewConn(ws) log.Printf("[server] new connection from %s", r.RemoteAddr) // First message must be login _, msg, err := ws.ReadMessage() if err != nil { log.Printf("[server] read login error: %v", err) ws.Close() return } hdr, err := protocol.DecodeHeader(msg) if err != nil || hdr.MainType != protocol.MsgLogin || hdr.SubType != protocol.SubLoginReq { log.Printf("[server] expected login, got %d:%d", hdr.MainType, hdr.SubType) ws.Close() return } var loginReq protocol.LoginReq if err := protocol.DecodePayload(msg, &loginReq); err != nil { log.Printf("[server] decode login: %v", err) ws.Close() return } // Verify token: master token OR tenant API key (DB) OR node_secret (DB) valid := s.tokens[loginReq.Token] log.Printf("[server] login check: token=%d, cfg.Token=%d, valid=%v", loginReq.Token, s.cfg.Token, valid) var tenantID int64 if !valid && s.store != nil { // try api key (string) or node secret if loginReq.NodeSecret != "" { if ten, err := s.store.VerifyNodeSecret(loginReq.Node, loginReq.NodeSecret); err == nil && ten != nil { valid = true tenantID = ten.ID } } if !valid { if ten, err := s.store.VerifyAPIKey(fmt.Sprintf("%d", loginReq.Token)); err == nil && ten != nil { valid = true tenantID = ten.ID } } } if !valid { log.Printf("[server] login denied: %s (token mismatch)", loginReq.Node) conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{ Error: 1, Detail: "invalid token", }) ws.Close() return } // Check duplicate node s.mu.Lock() sdwanCfg := s.sdwan.get() log.Printf("[server] sdwan config: enabled=%v gateway=%s nodes=%d", sdwanCfg.Enabled, sdwanCfg.GatewayCIDR, len(sdwanCfg.Nodes)) if old, exists := s.nodes[loginReq.Node]; exists { log.Printf("[server] replacing existing node %s", loginReq.Node) old.Conn.Close() } node := &NodeInfo{ Name: loginReq.Node, Token: loginReq.Token, TenantID: tenantID, User: loginReq.User, Version: loginReq.Version, NATType: loginReq.NATType, ShareBandwidth: loginReq.ShareBandwidth, RelayEnabled: loginReq.RelayEnabled, SuperRelay: loginReq.SuperRelay, PublicIP: loginReq.PublicIP, PublicPort: loginReq.PublicPort, LoginTime: time.Now(), LastHeartbeat: time.Now(), Conn: conn, } s.nodes[loginReq.Node] = node s.mu.Unlock() if node.PublicIP == "" { // fallback to TCP remote addr if client didn't provide host, _, _ := net.SplitHostPort(r.RemoteAddr) node.PublicIP = host } // Send login response conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{ Error: 0, Ts: time.Now().Unix(), Token: loginReq.Token, User: loginReq.User, Node: loginReq.Node, }) log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s, public=%s:%d", loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version, node.PublicIP, node.PublicPort) // Notify other nodes s.broadcastNodeOnline(loginReq.Node) // Push current SDWAN config right after login (if exists and enabled) if node.TenantID > 0 { if cfg := s.sdwan.getTenant(node.TenantID); cfg.Enabled && cfg.GatewayCIDR != "" { if err := conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg); err != nil { log.Printf("[server] sdwan config push failed: %v", err) } else { log.Printf("[server] sdwan config pushed to %s", loginReq.Node) } } } else { if cfg := s.sdwan.get(); cfg.Enabled && cfg.GatewayCIDR != "" { if err := conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg); err != nil { log.Printf("[server] sdwan config push failed: %v", err) } else { log.Printf("[server] sdwan config pushed to %s", loginReq.Node) } } } // Event-driven SDWAN peer notification s.announceSDWANNodeOnline(loginReq.Node) // Register message handlers s.registerHandlers(conn, node) // Start read loop (blocks until disconnect) if err := conn.ReadLoop(); err != nil { log.Printf("[server] %s disconnected: %v", loginReq.Node, err) } // Cleanup s.mu.Lock() if current, ok := s.nodes[loginReq.Node]; ok && current == node { delete(s.nodes, loginReq.Node) } s.mu.Unlock() s.announceSDWANNodeOffline(loginReq.Node) log.Printf("[server] %s offline", loginReq.Node) } func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) { // Heartbeat conn.OnMessage(protocol.MsgHeartbeat, protocol.SubHeartbeatPing, func(data []byte) error { node.mu.Lock() node.LastHeartbeat = time.Now() node.mu.Unlock() return conn.Write(protocol.MsgHeartbeat, protocol.SubHeartbeatPong, nil) }) // ReportBasic conn.OnMessage(protocol.MsgReport, protocol.SubReportBasic, func(data []byte) error { var report protocol.ReportBasic if err := protocol.DecodePayload(data, &report); err != nil { return err } node.mu.Lock() node.OS = report.OS node.Mac = report.Mac node.LanIP = report.LanIP node.Version = report.Version node.HasIPv4 = report.HasIPv4 node.IPv6 = report.IPv6 node.mu.Unlock() log.Printf("[server] ReportBasic from %s: os=%s lanIP=%s", node.Name, report.OS, report.LanIP) // Update public IP/port from NAT report (if provided) if report.PublicIP != "" { node.mu.Lock() node.PublicIP = report.PublicIP node.PublicPort = report.PublicPort node.mu.Unlock() } // Always respond (official OpenP2P bug: not responding causes client to disconnect) return conn.Write(protocol.MsgReport, protocol.SubReportBasic, protocol.ReportBasicRsp{Error: 0}) }) // ReportApps conn.OnMessage(protocol.MsgReport, protocol.SubReportApps, func(data []byte) error { var apps []protocol.AppConfig protocol.DecodePayload(data, &apps) node.mu.Lock() node.Apps = apps node.mu.Unlock() log.Printf("[server] ReportApps from %s: %d apps", node.Name, len(apps)) return nil }) // ReportConnect conn.OnMessage(protocol.MsgReport, protocol.SubReportConnect, func(data []byte) error { var rc protocol.ReportConnect protocol.DecodePayload(data, &rc) if rc.Error != "" { log.Printf("[server] ConnectReport ERROR from %s: peer=%s mode=%s err=%s", node.Name, rc.PeerNode, rc.LinkMode, rc.Error) } else { log.Printf("[server] ConnectReport OK from %s: peer=%s mode=%s rtt=%dms", node.Name, rc.PeerNode, rc.LinkMode, rc.RTT) } return nil }) // ConnectReq — client wants to connect to a peer conn.OnMessage(protocol.MsgPush, protocol.SubPushConnectReq, func(data []byte) error { var req protocol.ConnectReq protocol.DecodePayload(data, &req) return s.HandleConnectReq(node, req) }) // RelayNodeReq — client asks for a relay node conn.OnMessage(protocol.MsgRelay, protocol.SubRelayNodeReq, func(data []byte) error { var req protocol.RelayNodeReq protocol.DecodePayload(data, &req) return s.handleRelayNodeReq(conn, node, req) }) // SDWAN data plane packet relay (JSON control payload) conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error { var pkt protocol.SDWANPacket if err := protocol.DecodePayload(data, &pkt); err != nil { return err } s.RouteSDWANPacket(node, pkt) return nil }) // SDWAN data plane packet relay (raw IP payload) conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANRaw, func(data []byte) error { log.Printf("[sdwan] raw packet from %s, len=%d", node.Name, len(data)) if len(data) <= protocol.HeaderSize { return nil } payload := data[protocol.HeaderSize:] if len(payload) < 20 { return nil } version := payload[0] >> 4 if version != 4 { return nil } srcIP := net.IP(payload[12:16]).String() dstIP := net.IP(payload[16:20]).String() pkt := protocol.SDWANPacket{SrcIP: srcIP, DstIP: dstIP, Payload: payload} s.RouteSDWANPacket(node, pkt) return nil }) } // handleRelayNodeReq finds and returns the best relay node. func (s *Server) handleRelayNodeReq(conn *signal.Conn, requester *NodeInfo, req protocol.RelayNodeReq) error { relays := s.GetRelayNodes(requester.User, requester.Name, req.PeerNode) if len(relays) == 0 { return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{ Error: 1, }) } // Pick the first (best) relay relay := relays[0] totp := auth.GenTOTP(relay.Token, time.Now().Unix()) mode := "private" if relay.User != requester.User { mode = "super" } log.Printf("[server] relay selected: %s (%s) for %s → %s", relay.Name, mode, requester.Name, req.PeerNode) return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{ RelayName: relay.Name, RelayIP: relay.PublicIP, RelayPort: config.DefaultRelayPort, RelayToken: totp, Mode: mode, Error: 0, }) } // PushConnect sends a punch coordination message to a peer node. func (s *Server) PushConnect(fromNode *NodeInfo, toNodeName string, app protocol.AppConfig) error { toNode := s.GetNodeForUser(toNodeName, fromNode.Token) if toNode == nil || !toNode.IsOnline() { return &NodeOfflineError{Node: toNodeName} } if fromNode.TenantID != 0 && toNode.TenantID != fromNode.TenantID { return &NodeOfflineError{Node: toNodeName} } // Push connect request to the destination req := protocol.ConnectReq{ From: fromNode.Name, To: toNodeName, FromIP: fromNode.PublicIP, Peer: protocol.PunchParams{ IP: fromNode.PublicIP, NATType: fromNode.NATType, HasIPv4: fromNode.HasIPv4, Token: auth.GenTOTP(fromNode.Token, time.Now().Unix()), }, AppName: app.AppName, Protocol: app.Protocol, SrcPort: app.SrcPort, DstHost: app.DstHost, DstPort: app.DstPort, } return toNode.Conn.Write(protocol.MsgPush, protocol.SubPushConnectReq, req) } // broadcastNodeOnline notifies interested nodes that a peer came online. func (s *Server) broadcastNodeOnline(nodeName string) { s.mu.RLock() newNode := s.nodes[nodeName] defer s.mu.RUnlock() if newNode == nil { return } for _, n := range s.nodes { if n.Name == nodeName { continue } if n.Token != newNode.Token && (newNode.TenantID == 0 || n.TenantID != newNode.TenantID) { continue } // Check if this node has any app targeting the new node n.mu.RLock() interested := false for _, app := range n.Apps { if app.PeerNode == nodeName { interested = true break } } n.mu.RUnlock() if interested { n.Conn.Write(protocol.MsgPush, protocol.SubPushNodeOnline, struct { Node string `json:"node"` }{Node: nodeName}) } } } // StartCleanup periodically removes stale nodes and checks SDWAN hub health. func (s *Server) StartCleanup() { go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: s.mu.Lock() for name, n := range s.nodes { if !n.IsOnline() { log.Printf("[server] cleanup stale node: %s", name) n.Conn.Close() delete(s.nodes, name) } } s.mu.Unlock() // hub offline -> auto mesh (tenant configs) if s.sdwan != nil { sd := s.sdwan sd.mu.RLock() m := make(map[int64]protocol.SDWANConfig, len(sd.multi)) for k, v := range sd.multi { m[k] = v } sd.mu.RUnlock() for tid, cfg := range m { if cfg.Mode != "hub" || cfg.HubNode == "" { continue } hub := s.GetNode(cfg.HubNode) if hub != nil && hub.IsOnline() && hub.TenantID == tid { continue } // auto fallback to mesh cfg.Mode = "mesh" cfg.HubNode = "" _ = s.sdwan.saveTenant(tid, cfg) s.broadcastSDWANTenant(tid, cfg) log.Printf("[sdwan] hub offline, auto fallback to mesh (tenant=%d)", tid) } } case <-s.quit: return } } }() } // Stop shuts down the server. func (s *Server) Stop() { close(s.quit) s.mu.Lock() for _, n := range s.nodes { n.Conn.Close() } s.mu.Unlock() } type NodeOfflineError struct { Node string } func (e *NodeOfflineError) Error() string { return "node offline: " + e.Node }