From 5bd7c6c7ea0c8cebe78fdf8ecff9b80511660ec0 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Thu, 18 Jun 2026 01:48:09 +0900 Subject: [PATCH] [client] Detect and recover from a stalled signal receive stream (#6459) --- client/internal/engine.go | 7 ++ shared/signal/client/grpc.go | 121 ++++++++++++++++++++--- shared/signal/client/watchdog_test.go | 84 ++++++++++++++++ shared/signal/proto/signalexchange.pb.go | 64 ++++++------ shared/signal/proto/signalexchange.proto | 1 + 5 files changed, 233 insertions(+), 44 deletions(-) create mode 100644 shared/signal/client/watchdog_test.go diff --git a/client/internal/engine.go b/client/internal/engine.go index cf40d898..42712da9 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1714,6 +1714,13 @@ func (e *Engine) receiveSignalEvents() { return e.ctx.Err() } + // Self-addressed heartbeat: the signal client's receive watchdog + // round-trips this through the server to confirm the receive stream + // is delivering. Liveness is already recorded before this handler. + if msg.GetBody().GetType() == sProto.Body_HEARTBEAT { + return nil + } + conn, ok := e.peerStore.PeerConn(msg.Key) if !ok { return fmt.Errorf("wrongly addressed message %s", msg.Key) diff --git a/shared/signal/client/grpc.go b/shared/signal/client/grpc.go index b245b229..eb18cea0 100644 --- a/shared/signal/client/grpc.go +++ b/shared/signal/client/grpc.go @@ -2,9 +2,11 @@ package client import ( "context" + "errors" "fmt" "io" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff/v4" @@ -23,7 +25,23 @@ import ( "github.com/netbirdio/netbird/util/wsproxy" ) -const healthCheckTimeout = 5 * time.Second +const ( + // receiveInactivityThreshold is how long the receive stream may be silent + // before the watchdog actively probes it. The gRPC transport can stay + // healthy (keepalive satisfied) while the server stops delivering messages, + // which the transport layer cannot detect. + receiveInactivityThreshold = 30 * time.Second + // receiveProbeTimeout is how long the watchdog waits for its self-addressed + // probe to round-trip back on the stream before declaring the receive + // direction dead. + receiveProbeTimeout = 10 * time.Second + // receiveWatchdogInterval is how often the watchdog evaluates the stream. + receiveWatchdogInterval = 10 * time.Second +) + +// errReceiveStreamStalled is reported when the receive stream is transport-alive +// but no longer delivering messages, so the stream is torn down to reconnect. +var errReceiveStreamStalled = errors.New("signal receive stream stalled") // ConnStateNotifier is a wrapper interface of the status recorder type ConnStateNotifier interface { @@ -52,6 +70,14 @@ type GrpcClient struct { decryptionWorker *Worker decryptionWorkerCancel context.CancelFunc decryptionWg sync.WaitGroup + + // lastReceived holds the Unix-nano timestamp of the last message read from + // the receive stream, used by the receive watchdog. + lastReceived atomic.Int64 + // receiveStalled is set by the receive watchdog when the stream is + // transport-alive but no longer delivering messages. It is the source of + // truth IsHealthy reads, and is cleared once any frame is received again. + receiveStalled atomic.Bool } // NewClient creates a new Signal client @@ -148,9 +174,9 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes // connect to Signal stream identifying ourselves with a public WireGuard key // todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management) - ctx, cancelStream := context.WithCancel(ctx) + streamCtx, cancelStream := context.WithCancel(ctx) defer cancelStream() - stream, err := c.connect(ctx, c.key.PublicKey().String()) + stream, err := c.connect(streamCtx, c.key.PublicKey().String()) if err != nil { log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) return err @@ -164,9 +190,16 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes // Start worker pool if not already started c.startEncryptionWorker(msgHandler) + // Guard the receive direction: the transport can stay healthy while the + // server stops delivering messages. The watchdog reconnects via cancelStream. + c.markReceived() + go c.watchReceiveStream(streamCtx, cancelStream) + // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream) if err != nil { + // Check the parent context, not streamCtx: a watchdog-triggered + // cancelStream must reconnect, only a parent cancel is shutdown. if ctx.Err() != nil { log.Debugf("signal connection context has been canceled, this usually indicates shutdown") return nil @@ -252,7 +285,10 @@ func (c *GrpcClient) Ready() bool { return c.signalConn.GetState() == connectivity.Ready || c.signalConn.GetState() == connectivity.Idle } -// IsHealthy probes the gRPC connection and returns false on errors +// IsHealthy reports whether the Signal connection is usable, based on the +// transport state plus the receive watchdog's verdict, and updates the status +// recorder accordingly. It does not actively probe: the watchdog +// (watchReceiveStream) owns probing the receive path and reconnecting. func (c *GrpcClient) IsHealthy() bool { switch c.signalConn.GetState() { case connectivity.TransientFailure: @@ -265,16 +301,8 @@ func (c *GrpcClient) IsHealthy() bool { case connectivity.Ready: } - ctx, cancel := context.WithTimeout(c.ctx, healthCheckTimeout) - defer cancel() - _, err := c.realClient.Send(ctx, &proto.EncryptedMessage{ - Key: c.key.PublicKey().String(), - RemoteKey: "dummy", - Body: nil, - }) - if err != nil { - c.notifyDisconnected(err) - log.Warnf("health check returned: %s", err) + if c.receiveStalled.Load() { + c.notifyDisconnected(errReceiveStreamStalled) return false } c.notifyConnected() @@ -398,6 +426,68 @@ func (c *GrpcClient) Send(msg *proto.Message) error { return err } +// markReceived records that a frame was just read from the receive stream and +// clears the stalled flag. +func (c *GrpcClient) markReceived() { + c.lastReceived.Store(time.Now().UnixNano()) + c.receiveStalled.Store(false) +} + +// idleSinceReceive returns how long the receive stream has been silent. +func (c *GrpcClient) idleSinceReceive() time.Duration { + return time.Since(time.Unix(0, c.lastReceived.Load())) +} + +// watchReceiveStream guards against a receive stream that is transport-alive but +// no longer delivering messages. While the stream is idle past +// receiveInactivityThreshold it sends a self-addressed probe that the Signal +// server routes back to this client. If the probe does not round-trip within +// receiveProbeTimeout the receive direction is considered dead and cancelStream +// is called so the retry loop reconnects. +func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream context.CancelFunc) { + ticker := time.NewTicker(receiveWatchdogInterval) + defer ticker.Stop() + + var probeSentAt time.Time + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if c.idleSinceReceive() < receiveInactivityThreshold { + probeSentAt = time.Time{} + continue + } + + if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout { + log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second)) + c.receiveStalled.Store(true) + c.notifyDisconnected(errReceiveStreamStalled) + cancelStream() + return + } + + if probeSentAt.IsZero() { + if err := c.sendReceiveProbe(); err != nil { + log.Debugf("failed to send signal receive probe: %v", err) + } + probeSentAt = time.Now() + } + } + } +} + +// sendReceiveProbe sends a self-addressed heartbeat. The Signal server routes it +// back to this client, exercising the exact receive path the watchdog guards. +func (c *GrpcClient) sendReceiveProbe() error { + self := c.key.PublicKey().String() + return c.Send(&proto.Message{ + Key: self, + RemoteKey: self, + Body: &proto.Body{Type: proto.Body_HEARTBEAT}, + }) +} + // receive receives messages from other peers coming through the Signal Exchange // and distributes them to worker threads for processing func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error { @@ -419,6 +509,9 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er return err } + // Any frame from the server proves the receive direction is alive. + c.markReceived() + if msg == nil { continue } diff --git a/shared/signal/client/watchdog_test.go b/shared/signal/client/watchdog_test.go new file mode 100644 index 00000000..1905e756 --- /dev/null +++ b/shared/signal/client/watchdog_test.go @@ -0,0 +1,84 @@ +package client + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "google.golang.org/grpc" + + sigProto "github.com/netbirdio/netbird/shared/signal/proto" + "github.com/netbirdio/netbird/signal/server" +) + +func startTestSignalServer(t *testing.T) string { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + s := grpc.NewServer() + srv, err := server.NewServer(context.Background(), otel.Meter("")) + require.NoError(t, err) + sigProto.RegisterSignalExchangeServer(s, srv) + + go func() { + _ = s.Serve(lis) + }() + t.Cleanup(s.Stop) + + return lis.Addr().String() +} + +// TestReceiveProbeRoundTrips verifies that the watchdog's self-addressed heartbeat +// is routed back to the same client through the signal server. This round-trip is +// what lets the watchdog confirm the receive direction is still delivering. +func TestReceiveProbeRoundTrips(t *testing.T) { + addr := startTestSignalServer(t) + + key, err := wgtypes.GenerateKey() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + client, err := NewClient(ctx, addr, key, false) + require.NoError(t, err) + t.Cleanup(func() { _ = client.Close() }) + + received := make(chan struct{}, 1) + go func() { + _ = client.Receive(ctx, func(msg *sigProto.Message) error { + if msg.GetBody().GetType() == sigProto.Body_HEARTBEAT && msg.GetKey() == key.PublicKey().String() { + select { + case received <- struct{}{}: + default: + } + } + return nil + }) + }() + + streamReady := make(chan struct{}) + go func() { + client.WaitStreamConnected() + close(streamReady) + }() + select { + case <-streamReady: + case <-time.After(5 * time.Second): + t.Fatal("signal stream did not connect within timeout") + } + + require.NoError(t, client.sendReceiveProbe()) + + select { + case <-received: + case <-time.After(3 * time.Second): + t.Fatal("self-addressed heartbeat did not round-trip back through the signal server") + } +} diff --git a/shared/signal/proto/signalexchange.pb.go b/shared/signal/proto/signalexchange.pb.go index 0c80fb48..8e07977f 100644 --- a/shared/signal/proto/signalexchange.pb.go +++ b/shared/signal/proto/signalexchange.pb.go @@ -30,6 +30,7 @@ const ( Body_CANDIDATE Body_Type = 2 Body_MODE Body_Type = 4 Body_GO_IDLE Body_Type = 5 + Body_HEARTBEAT Body_Type = 6 ) // Enum value maps for Body_Type. @@ -40,6 +41,7 @@ var ( 2: "CANDIDATE", 4: "MODE", 5: "GO_IDLE", + 6: "HEARTBEAT", } Body_Type_value = map[string]int32{ "OFFER": 0, @@ -47,6 +49,7 @@ var ( "CANDIDATE": 2, "MODE": 4, "GO_IDLE": 5, + "HEARTBEAT": 6, } ) @@ -463,7 +466,7 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, - 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xc3, 0x04, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, + 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xd2, 0x04, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, @@ -491,38 +494,39 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x29, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x02, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x88, 0x01, 0x01, 0x22, 0x52, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, - 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x15, - 0x0a, 0x13, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, - 0x64, 0x72, 0x65, 0x73, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x49, 0x64, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x49, 0x50, 0x4a, 0x04, 0x08, 0x09, 0x10, 0x0a, 0x22, 0x2e, 0x0a, 0x04, 0x4d, - 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, - 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, - 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, - 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, - 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, - 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53, - 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, - 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, - 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, - 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, - 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73, - 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, - 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, - 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, - 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x0d, + 0x0a, 0x09, 0x48, 0x45, 0x41, 0x52, 0x54, 0x42, 0x45, 0x41, 0x54, 0x10, 0x06, 0x42, 0x15, 0x0a, + 0x13, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, + 0x72, 0x65, 0x73, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x49, 0x50, 0x4a, 0x04, 0x08, 0x09, 0x10, 0x0a, 0x22, 0x2e, 0x0a, 0x04, 0x4d, 0x6f, + 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, 0x42, + 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, 0x6f, + 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, + 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, + 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, + 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53, 0x69, + 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, 0x04, + 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, + 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73, 0x69, + 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, + 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, + 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, + 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/shared/signal/proto/signalexchange.proto b/shared/signal/proto/signalexchange.proto index 96a4001e..8c304e37 100644 --- a/shared/signal/proto/signalexchange.proto +++ b/shared/signal/proto/signalexchange.proto @@ -48,6 +48,7 @@ message Body { CANDIDATE = 2; MODE = 4; GO_IDLE = 5; + HEARTBEAT = 6; } Type type = 1; string payload = 2;