// inp2ps — INP2P Signaling Server package main import ( "context" "encoding/json" "flag" "fmt" "io" "log" "net" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/openp2p-cn/inp2p/internal/server" "github.com/openp2p-cn/inp2p/internal/store" "github.com/openp2p-cn/inp2p/pkg/auth" "github.com/openp2p-cn/inp2p/pkg/config" "github.com/openp2p-cn/inp2p/pkg/nat" "github.com/openp2p-cn/inp2p/pkg/protocol" ) type rateLimiter struct { mu sync.Mutex m map[string]*rateEntry max int window time.Duration } type rateEntry struct { count int reset time.Time } func newRateLimiter(max int, window time.Duration) *rateLimiter { return &rateLimiter{m: make(map[string]*rateEntry), max: max, window: window} } func (r *rateLimiter) Allow(key string) bool { r.mu.Lock() defer r.mu.Unlock() e, ok := r.m[key] now := time.Now() if !ok || now.After(e.reset) { r.m[key] = &rateEntry{count: 1, reset: now.Add(r.window)} return true } if e.count >= r.max { return false } e.count++ return true } func clientIP(addr string) string { host, _, err := net.SplitHostPort(addr) if err != nil { return addr } return host } func main() { cfg := config.DefaultServerConfig() flag.IntVar(&cfg.WSPort, "ws-port", cfg.WSPort, "WebSocket signaling port") flag.IntVar(&cfg.WebPort, "web-port", cfg.WebPort, "Web console port") flag.IntVar(&cfg.STUNUDP1, "stun-udp1", cfg.STUNUDP1, "UDP STUN port 1") flag.IntVar(&cfg.STUNUDP2, "stun-udp2", cfg.STUNUDP2, "UDP STUN port 2") flag.IntVar(&cfg.STUNTCP1, "stun-tcp1", cfg.STUNTCP1, "TCP STUN port 1") flag.IntVar(&cfg.STUNTCP2, "stun-tcp2", cfg.STUNTCP2, "TCP STUN port 2") flag.StringVar(&cfg.DBPath, "db", cfg.DBPath, "SQLite database path") flag.StringVar(&cfg.CertFile, "cert", "", "TLS certificate file") flag.StringVar(&cfg.KeyFile, "key", "", "TLS key file") flag.IntVar(&cfg.LogLevel, "log-level", cfg.LogLevel, "Log level (0=debug 1=info 2=warn 3=error)") token := flag.Uint64("token", 0, "Master authentication token (uint64)") user := flag.String("user", "", "Username for token generation (requires -password)") pass := flag.String("password", "", "Password for token generation") bootstrapAdmin := flag.String("bootstrap-admin", "", "Bootstrap system admin username (letters only, >=6)") bootstrapPass := flag.String("bootstrap-password", "", "Bootstrap system admin password") version := flag.Bool("version", false, "Print version and exit") flag.Parse() if *version { fmt.Printf("inp2ps version %s\ncommit: %s\nbuild: %s\ngo: %s\n", config.Version, config.GitCommit, config.BuildTime, config.GoVersion) os.Exit(0) } // Bootstrap system admin (optional) if *bootstrapAdmin != "" { if !server.IsValidGlobalUsername(*bootstrapAdmin) { log.Fatalf("[main] invalid bootstrap-admin username (letters only, >=6)") } if *bootstrapPass == "" { log.Fatalf("[main] bootstrap-password required") } st, err := store.Open(cfg.DBPath) if err != nil { log.Fatalf("[main] open store failed: %v", err) } _ = st.SetSetting("bootstrapped_admin", "1") // ensure default tenant exists if _, gErr := st.GetTenantByID(1); gErr != nil { _, _, _, _ = st.CreateTenantWithUsers("default", "admin", "admin") } // update/create admin user in tenant 1 users, _ := st.ListUsers(1) var adminID int64 for _, u := range users { if u.Role == "admin" { adminID = u.ID break } } if adminID > 0 { _ = st.UpdateUserEmail(adminID, *bootstrapAdmin) _ = st.UpdateUserPassword(adminID, *bootstrapPass) log.Printf("[main] bootstrapped admin updated: %s", *bootstrapAdmin) } else { _, err = st.CreateUser(1, "admin", *bootstrapAdmin, *bootstrapPass, 1) if err != nil { log.Fatalf("[main] bootstrap admin create failed: %v", err) } log.Printf("[main] bootstrapped admin created: %s", *bootstrapAdmin) } os.Exit(0) } // Token: either direct value or generated from user+password if *token > 0 { cfg.Token = *token } else if *user != "" && *pass != "" { cfg.Token = auth.MakeToken(*user, *pass) log.Printf("[main] token generated from credentials: %d", cfg.Token) } cfg.FillFromEnv() if err := cfg.Validate(); err != nil { log.Fatalf("[main] config error: %v", err) } log.Printf("[main] inp2ps v%s starting", config.Version) log.Printf("[main] WSS :%d | STUN UDP :%d,%d | STUN TCP :%d,%d", cfg.WSPort, cfg.STUNUDP1, cfg.STUNUDP2, cfg.STUNTCP1, cfg.STUNTCP2) // ─── STUN Servers ─── stunQuit := make(chan struct{}) startSTUN := func(proto string, port int, fn func(int, <-chan struct{}) error) { go func() { log.Printf("[main] %s STUN listening on :%d", proto, port) if err := fn(port, stunQuit); err != nil { log.Printf("[main] %s STUN :%d error: %v", proto, port, err) } }() } startSTUN("UDP", cfg.STUNUDP1, nat.ServeUDPSTUN) if cfg.STUNUDP2 != cfg.STUNUDP1 { startSTUN("UDP", cfg.STUNUDP2, nat.ServeUDPSTUN) } startSTUN("TCP", cfg.STUNTCP1, nat.ServeTCPSTUN) if cfg.STUNTCP2 != cfg.STUNTCP1 { startSTUN("TCP", cfg.STUNTCP2, nat.ServeTCPSTUN) } // ─── Signaling Server ─── // ─── Signaling Server ─── srv := server.New(cfg) srv.StartCleanup() // Admin-only Middleware (System Admin session only) adminMiddleware := func(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/api/v1/auth/login" { next(w, r) return } ac, ok := srv.ResolveAccess(r, cfg.Token) if !ok || ac.Kind != "session" || ac.Role != "admin" || ac.TenantID != 1 { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) fmt.Fprintf(w, `{"error":401,"message":"unauthorized"}`) return } r = r.WithContext(context.WithValue(r.Context(), server.ServerCtxKeyAccess{}, ac)) next(w, r) } } // Tenant Middleware (session/apikey only, no operator role) tenantMiddleware := func(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/api/v1/auth/login" { next(w, r) return } ac, ok := srv.ResolveAccess(r, cfg.Token) if !ok { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) fmt.Fprintf(w, `{"error":401,"message":"unauthorized"}`) return } // reject master token for tenant APIs if ac.Kind == "master" { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) fmt.Fprintf(w, `{"error":401,"message":"unauthorized"}`) return } r = r.WithContext(context.WithValue(r.Context(), server.ServerCtxKeyAccess{}, ac)) next(w, r) } } getTenantID := func(r *http.Request) int64 { tok := server.BearerToken(r) if tok == "" { return 0 } if ac, ok := srv.ResolveTenantAccessToken(tok); ok && ac.Kind != "master" { return ac.TenantID } return 0 } listNodesOut := func(nodes []*server.NodeInfo, tenantID int64) []map[string]any { out := make([]map[string]any, 0, len(nodes)) for _, n := range nodes { item := map[string]any{ "name": n.Name, "displayName": n.Name, "publicIP": n.PublicIP, "publicPort": n.PublicPort, "natType": n.NATType, "tenantId": n.TenantID, "version": n.Version, "relayEnabled": n.RelayEnabled, "superRelay": n.SuperRelay, "loginTime": n.LoginTime, "lastHeartbeat": n.LastHeartbeat, "nodeUUID": "", "alias": "", "virtualIP": "", } if tenantID > 0 && srv.Store() != nil { if nc, err := srv.Store().GetNodeCredentialByName(tenantID, n.Name); err == nil && nc != nil { item["nodeUUID"] = nc.NodeUUID item["alias"] = nc.Alias item["virtualIP"] = nc.VirtualIP if nc.Alias != "" { item["displayName"] = nc.Alias } } } out = append(out, item) } return out } mux := http.NewServeMux() mux.HandleFunc("/ws", srv.HandleWS) // Serve Static Web Console webDir := "/root/.openclaw/workspace/inp2p/web" mux.Handle("/", http.FileServer(http.Dir(webDir))) // Tenant APIs (API key auth inside handlers) mux.HandleFunc("/api/v1/admin/tenants", adminMiddleware(srv.HandleAdminCreateTenant)) mux.HandleFunc("/api/v1/admin/tenants/", adminMiddleware(srv.HandleAdminCreateAPIKey)) mux.HandleFunc("/api/v1/admin/users", adminMiddleware(srv.HandleAdminUsers)) mux.HandleFunc("/api/v1/admin/users/", adminMiddleware(srv.HandleAdminUsers)) mux.HandleFunc("/api/v1/admin/settings", adminMiddleware(srv.HandleAdminSettings)) mux.HandleFunc("/api/v1/admin/audit", adminMiddleware(srv.HandleAdminAudit)) mux.HandleFunc("/api/v1/tenants/enroll", srv.HandleTenantEnroll) // enroll consume with rate-limit rl := newRateLimiter(10, time.Minute) mux.HandleFunc("/api/v1/enroll/consume", func(w http.ResponseWriter, r *http.Request) { ip := clientIP(r.RemoteAddr) if !rl.Allow(ip) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusTooManyRequests) fmt.Fprintf(w, `{"error":1,"message":"too many requests"}`) return } srv.HandleEnrollConsume(w, r) }) mux.HandleFunc("/api/v1/enroll/consume/", func(w http.ResponseWriter, r *http.Request) { ip := clientIP(r.RemoteAddr) if !rl.Allow(ip) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusTooManyRequests) fmt.Fprintf(w, `{"error":1,"message":"too many requests"}`) return } srv.HandleEnrollConsume(w, r) }) mux.HandleFunc("/api/v1/nodes/alias", tenantMiddleware(srv.HandleNodeMeta)) mux.HandleFunc("/api/v1/nodes/ip", tenantMiddleware(srv.HandleNodeMeta)) mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } // single mode: username/password login var reqUser struct { Username string `json:"username"` Password string `json:"password"` } body, _ := io.ReadAll(r.Body) _ = json.Unmarshal(body, &reqUser) if reqUser.Username == "" || reqUser.Password == "" { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, `{"error":1,"message":"username and password required"}`) return } if !server.IsValidGlobalUsername(reqUser.Username) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, `{"error":1,"message":"username must be letters only and >=6"}`) return } if srv.Store() == nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, `{"error":1,"message":"store not ready"}`) return } u, err := srv.Store().VerifyUserPasswordGlobal(reqUser.Username, reqUser.Password) if err != nil || u == nil || u.Status != 1 { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) fmt.Fprintf(w, `{"error":1,"message":"invalid credentials"}`) return } sessionToken, exp, err := srv.Store().CreateSessionToken(u.ID, u.TenantID, u.Role, 24*time.Hour) if err != nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, `{"error":1,"message":"create session failed"}`) return } ten, _ := srv.Store().GetTenantByID(u.TenantID) resp := struct { Error int `json:"error"` Message string `json:"message"` Token string `json:"token"` TokenType string `json:"token_type"` ExpiresAt int64 `json:"expires_at"` Role string `json:"role"` Status int `json:"status"` Subnet string `json:"subnet"` }{0, "ok", sessionToken, "session", exp, u.Role, u.Status, ""} if ten != nil { resp.Subnet = ten.Subnet } b, _ := json.Marshal(resp) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write(b) }) mux.HandleFunc("/api/v1/health", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"status":"ok","version":"%s","nodes":%d}`, config.Version, len(srv.GetOnlineNodes())) })) mux.HandleFunc("/api/v1/nodes", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 { nodes := srv.GetOnlineNodesByTenant(tenantID) _ = json.NewEncoder(w).Encode(map[string]any{"nodes": listNodesOut(nodes, tenantID)}) return } nodes := srv.GetOnlineNodes() _ = json.NewEncoder(w).Encode(map[string]any{"nodes": listNodesOut(nodes, 0)}) })) mux.HandleFunc("/api/v1/sdwans", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 { _ = json.NewEncoder(w).Encode(srv.GetSDWANTenant(tenantID)) return } _ = json.NewEncoder(w).Encode(srv.GetSDWAN()) })) mux.HandleFunc("/api/v1/sdwan/edit", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var req protocol.SDWANConfig if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if req.Mode == "hub" && req.HubNode == "" { http.Error(w, "hub mode requires hubNode", http.StatusBadRequest) return } // subnet proxy validation (basic) for _, sp := range req.SubnetProxies { if sp.Node == "" || sp.LocalCIDR == "" || sp.VirtualCIDR == "" { http.Error(w, "subnet proxy requires node/localCIDR/virtualCIDR", http.StatusBadRequest) return } _, lnet, lerr := net.ParseCIDR(sp.LocalCIDR) _, vnet, verr := net.ParseCIDR(sp.VirtualCIDR) if lerr != nil || verr != nil || lnet == nil || vnet == nil { http.Error(w, "subnet proxy CIDR invalid", http.StatusBadRequest) return } lOnes, _ := lnet.Mask.Size() vOnes, _ := vnet.Mask.Size() if lOnes != vOnes { http.Error(w, "subnet proxy CIDR mask mismatch", http.StatusBadRequest) return } } // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 { ac := server.GetAccessContext(r) actorType, actorID := "", "" if ac != nil { actorType = ac.Kind actorID = fmt.Sprintf("%d", ac.UserID) } if err := srv.SetSDWANTenant(tenantID, req, actorType, actorID, r.RemoteAddr); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "ok"}) return } if err := srv.SetSDWAN(req); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "ok"}) })) // Remote Config Push API mux.HandleFunc("/api/v1/nodes/apps", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var req struct { Node string `json:"node"` Apps []protocol.AppConfig `json:"apps"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } node := srv.GetNode(req.Node) if node == nil { http.Error(w, "node not found", http.StatusNotFound) return } // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 && node.TenantID != tenantID { http.Error(w, "node not found", http.StatusNotFound) return } // Push to client _ = node.Conn.Write(protocol.MsgPush, protocol.SubPushConfig, req.Apps) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "config push sent"}) })) // Kick (disconnect) a node mux.HandleFunc("/api/v1/nodes/kick", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var req struct { Node string `json:"node"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } node := srv.GetNode(req.Node) if node == nil { http.Error(w, "node not found or offline", http.StatusNotFound) return } // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 && node.TenantID != tenantID { http.Error(w, "node not found", http.StatusNotFound) return } node.Conn.Close() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "node kicked"}) })) // Trigger P2P connect between two nodes mux.HandleFunc("/api/v1/connect", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var req struct { From string `json:"from"` To string `json:"to"` SrcPort int `json:"srcPort"` DstPort int `json:"dstPort"` AppName string `json:"appName"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } fromNode := srv.GetNode(req.From) if fromNode == nil { http.Error(w, "source node offline", http.StatusNotFound) return } // tenant filter by session/apikey tenantID := getTenantID(r) if tenantID > 0 && fromNode.TenantID != tenantID { http.Error(w, "node not found", http.StatusNotFound) return } app := protocol.AppConfig{ AppName: req.AppName, Protocol: "tcp", SrcPort: req.SrcPort, PeerNode: req.To, DstHost: "127.0.0.1", DstPort: req.DstPort, Enabled: 1, } // enforce same-tenant target if tenantID > 0 { toNode := srv.GetNode(req.To) if toNode == nil || toNode.TenantID != tenantID { http.Error(w, "node not found", http.StatusNotFound) return } } if err := srv.PushConnect(fromNode, req.To, app); err != nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadGateway) _ = json.NewEncoder(w).Encode(map[string]any{"error": 1, "message": err.Error()}) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "connect request sent"}) })) // Server uptime + detailed stats mux.HandleFunc("/api/v1/stats", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) { nodes := srv.GetOnlineNodes() coneCount, symmCount, unknCount := 0, 0, 0 relayCount := 0 for _, n := range nodes { switch n.NATType { case 1: coneCount++ case 2: symmCount++ default: unknCount++ } if n.RelayEnabled || n.SuperRelay { relayCount++ } } sdwan := srv.GetSDWAN() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ "nodes": len(nodes), "relay": relayCount, "cone": coneCount, "symmetric": symmCount, "unknown": unknCount, "sdwan": sdwan.Enabled, "version": config.Version, }) })) // ─── HTTP Listener ─── ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.WSPort)) if err != nil { log.Fatalf("[main] listen :%d: %v", cfg.WSPort, err) } log.Printf("[main] signaling server on :%d (no TLS — use reverse proxy for production)", cfg.WSPort) // Enable TCP keepalive at server level httpSrv := &http.Server{ Handler: mux, ReadHeaderTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 120 * time.Second, } go func() { if err := httpSrv.Serve(ln); err != http.ErrServerClosed { log.Fatalf("[main] serve: %v", err) } }() // ─── Graceful Shutdown ─── sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh log.Println("[main] shutting down...") close(stunQuit) srv.Stop() httpSrv.Shutdown(context.Background()) log.Println("[main] goodbye") }