Files
inp2p/pkg/signal/conn.go

208 lines
4.6 KiB
Go

// Package signal provides the WSS signaling connection between client and server.
package signal
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/openp2p-cn/inp2p/pkg/protocol"
)
// Conn wraps a WebSocket connection with message framing.
type Conn struct {
ws *websocket.Conn
writeMu sync.Mutex
handlers map[msgKey]Handler
hMu sync.RWMutex
quit chan struct{}
once sync.Once
Node string
Token uint64
// waiters for synchronous request-response
waiters map[msgKey]chan []byte
wMu sync.Mutex
}
type msgKey struct {
main uint16
sub uint16
}
// Handler processes an incoming message. data includes header + payload.
type Handler func(data []byte) error
// NewConn wraps an existing websocket.
func NewConn(ws *websocket.Conn) *Conn {
return &Conn{
ws: ws,
handlers: make(map[msgKey]Handler),
waiters: make(map[msgKey]chan []byte),
quit: make(chan struct{}),
}
}
// OnMessage registers a handler for a specific (MainType, SubType).
func (c *Conn) OnMessage(mainType, subType uint16, h Handler) {
c.hMu.Lock()
c.handlers[msgKey{mainType, subType}] = h
c.hMu.Unlock()
}
// Write sends a message with the given type and JSON payload.
func (c *Conn) Write(mainType, subType uint16, payload interface{}) error {
frame, err := protocol.Encode(mainType, subType, payload)
if err != nil {
return err
}
return c.WriteRaw(frame)
}
// WriteRaw sends raw bytes.
func (c *Conn) WriteRaw(data []byte) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
return c.ws.WriteMessage(websocket.BinaryMessage, data)
}
// Request sends a message and waits for a specific response type.
func (c *Conn) Request(mainType, subType uint16, payload interface{},
rspMain, rspSub uint16, timeout time.Duration) ([]byte, error) {
ch := make(chan []byte, 1)
key := msgKey{rspMain, rspSub}
c.wMu.Lock()
c.waiters[key] = ch
c.wMu.Unlock()
defer func() {
c.wMu.Lock()
delete(c.waiters, key)
c.wMu.Unlock()
}()
if err := c.Write(mainType, subType, payload); err != nil {
return nil, err
}
select {
case data := <-ch:
return data, nil
case <-time.After(timeout):
return nil, fmt.Errorf("request timeout %d:%d → %d:%d", mainType, subType, rspMain, rspSub)
case <-c.quit:
return nil, fmt.Errorf("connection closed")
}
}
// ReadLoop reads messages and dispatches to handlers. Blocks until error or Close().
func (c *Conn) ReadLoop() error {
// keepalive to avoid idle close (read deadline = 3x ping interval)
_ = c.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
c.ws.SetPongHandler(func(string) error {
_ = c.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
return nil
})
// Send ping frames periodically to keep NAT/WSS alive
// Increased frequency to 10s for better resilience against network hiccups
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.quit:
return
case <-ticker.C:
c.writeMu.Lock()
_ = c.ws.SetWriteDeadline(time.Now().Add(5 * time.Second))
err := c.ws.WriteMessage(websocket.PingMessage, []byte(time.Now().Format("20060102150405")))
if err != nil {
log.Printf("[signal] ping failed: %v, will reconnect", err)
}
c.writeMu.Unlock()
}
}
}()
for {
_, msg, err := c.ws.ReadMessage()
if err != nil {
select {
case <-c.quit:
return nil
default:
return err
}
}
if len(msg) < protocol.HeaderSize {
continue
}
h, err := protocol.DecodeHeader(msg)
if err != nil {
continue
}
key := msgKey{h.MainType, h.SubType}
// Check waiters first (synchronous request-response)
c.wMu.Lock()
if ch, ok := c.waiters[key]; ok {
delete(c.waiters, key)
c.wMu.Unlock()
select {
case ch <- msg:
default:
}
continue
}
c.wMu.Unlock()
// Dispatch to registered handler
c.hMu.RLock()
handler, ok := c.handlers[key]
c.hMu.RUnlock()
if ok {
if err := handler(msg); err != nil {
log.Printf("[signal] handler %d:%d error: %v", h.MainType, h.SubType, err)
}
}
}
}
// Close gracefully shuts down the connection.
func (c *Conn) Close() {
c.once.Do(func() {
close(c.quit)
c.ws.Close()
})
}
// IsClosed reports whether the connection has been closed.
func (c *Conn) IsClosed() bool {
select {
case <-c.quit:
return true
default:
return false
}
}
// ─── Helpers ───
// ParsePayload is a convenience to unmarshal JSON from a raw message.
func ParsePayload[T any](data []byte) (T, error) {
var v T
if len(data) <= protocol.HeaderSize {
return v, nil
}
err := json.Unmarshal(data[protocol.HeaderSize:], &v)
return v, err
}