[management] fix L4 service update when no custom port (#6396)
This fixes an issue where L4 service update is not possible when proxy clusters don't support custom ports
This commit is contained in:
@@ -488,6 +488,195 @@ func TestUpdate_AllowsPortChange(t *testing.T) {
|
||||
assert.Equal(t, uint16(54321), updated.ListenPort, "explicit port change should be applied")
|
||||
}
|
||||
|
||||
func TestUpdate_PreservesPortWhenCustomPortsNotSupported(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, boolPtr(false))
|
||||
ctx := context.Background()
|
||||
|
||||
existing := seedService(t, testStore, "tcp-svc", "tcp", testCluster, testCluster, 12345)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: existing.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tcp-svc-renamed",
|
||||
Mode: "tcp",
|
||||
Domain: testCluster,
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 0,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 9090, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.NoError(t, err, "update must not be rejected by the custom-port capability check")
|
||||
assert.Equal(t, uint16(12345), updated.ListenPort, "existing listen port should be preserved on unsupported cluster")
|
||||
}
|
||||
|
||||
func TestUpdate_PreservesPortWhenCustomPortsUnknown(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, nil)
|
||||
ctx := context.Background()
|
||||
|
||||
existing := seedService(t, testStore, "tcp-svc", "tcp", testCluster, testCluster, 12345)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: existing.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tcp-svc-renamed",
|
||||
Mode: "tcp",
|
||||
Domain: testCluster,
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 0,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 9090, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.NoError(t, err, "update must not be rejected when cluster capability is unknown")
|
||||
assert.Equal(t, uint16(12345), updated.ListenPort, "existing listen port should be preserved when capability is unknown")
|
||||
}
|
||||
|
||||
func TestUpdate_RejectsPortChangeWhenCustomPortsNotSupported(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, boolPtr(false))
|
||||
ctx := context.Background()
|
||||
|
||||
existing := seedService(t, testStore, "tcp-svc", "tcp", testCluster, testCluster, 12345)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: existing.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tcp-svc",
|
||||
Mode: "tcp",
|
||||
Domain: testCluster,
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 54321,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 9090, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.Error(t, err, "explicit port change on update must be rejected on unsupported clusters")
|
||||
assert.Contains(t, err.Error(), "custom ports not supported on target cluster")
|
||||
}
|
||||
|
||||
func TestUpdate_TLSPortChangeAllowedWhenNotSupported(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, boolPtr(false))
|
||||
ctx := context.Background()
|
||||
|
||||
existing := seedService(t, testStore, "tls-svc", "tls", "app.example.com", testCluster, 443)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: existing.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tls-svc",
|
||||
Mode: "tls",
|
||||
Domain: "app.example.com",
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 9999,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 8443, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.NoError(t, err, "TLS port change uses SNI routing and is exempt from the custom-port check")
|
||||
assert.Equal(t, uint16(9999), updated.ListenPort, "TLS port change should be applied")
|
||||
}
|
||||
|
||||
func TestValidateL4PortDiffOnClusterDiff(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
mode string
|
||||
customPorts *bool
|
||||
newPort uint16
|
||||
oldPort uint16
|
||||
wantErr bool
|
||||
}{
|
||||
{"tcp port change unsupported", "tcp", boolPtr(false), 54321, 12345, true},
|
||||
{"tcp port change unknown capability", "tcp", nil, 54321, 12345, true},
|
||||
{"udp port change unsupported", "udp", boolPtr(false), 54321, 12345, true},
|
||||
{"tcp first port assignment unsupported", "tcp", boolPtr(false), 54321, 0, true},
|
||||
{"tcp port change supported", "tcp", boolPtr(true), 54321, 12345, false},
|
||||
{"tcp port unchanged unsupported", "tcp", boolPtr(false), 12345, 12345, false},
|
||||
{"tcp zero port unsupported", "tcp", boolPtr(false), 0, 12345, false},
|
||||
{"tls port change unsupported", "tls", boolPtr(false), 9999, 443, false},
|
||||
{"http mode ignored", "http", boolPtr(false), 54321, 12345, false},
|
||||
{"empty mode ignored", "", boolPtr(false), 54321, 12345, false},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
newSvc := &rpservice.Service{Mode: tc.mode, ListenPort: tc.newPort, ProxyCluster: testCluster}
|
||||
oldSvc := &rpservice.Service{Mode: tc.mode, ListenPort: tc.oldPort, ProxyCluster: testCluster}
|
||||
|
||||
err := validateL4PortDiffOnClusterDiff(tc.customPorts, newSvc, oldSvc)
|
||||
if tc.wantErr {
|
||||
assert.Error(t, err, "port diff should be rejected for %s", tc.name)
|
||||
} else {
|
||||
assert.NoError(t, err, "port diff should be allowed for %s", tc.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdate_PortConflictRejected(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, boolPtr(true))
|
||||
ctx := context.Background()
|
||||
|
||||
seedService(t, testStore, "tcp-a", "tcp", "tcp-a."+testCluster, testCluster, 5432)
|
||||
svcB := seedService(t, testStore, "tcp-b", "tcp", "tcp-b."+testCluster, testCluster, 6543)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: svcB.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tcp-b",
|
||||
Mode: "tcp",
|
||||
Domain: "tcp-b." + testCluster,
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 5432,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 9090, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.Error(t, err, "updating to a port held by another service should be rejected")
|
||||
assert.Contains(t, err.Error(), "already in use")
|
||||
}
|
||||
|
||||
func TestUpdate_AutoAssignsWhenNoPort(t *testing.T) {
|
||||
mgr, testStore, _ := setupL4Test(t, boolPtr(false))
|
||||
ctx := context.Background()
|
||||
|
||||
existing := seedService(t, testStore, "tcp-svc", "tcp", testCluster, testCluster, 0)
|
||||
|
||||
updated := &rpservice.Service{
|
||||
ID: existing.ID,
|
||||
AccountID: testAccountID,
|
||||
Name: "tcp-svc",
|
||||
Mode: "tcp",
|
||||
Domain: testCluster,
|
||||
ProxyCluster: testCluster,
|
||||
ListenPort: 0,
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{AccountID: testAccountID, TargetId: testPeerID, TargetType: rpservice.TargetTypePeer, Protocol: "tcp", Port: 9090, Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := mgr.persistServiceUpdate(ctx, testAccountID, updated)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, updated.ListenPort >= autoAssignPortMin && updated.ListenPort <= autoAssignPortMax,
|
||||
"auto-assigned port %d should be in range [%d, %d]", updated.ListenPort, autoAssignPortMin, autoAssignPortMax)
|
||||
assert.True(t, updated.PortAutoAssigned, "PortAutoAssigned should be set when update triggers auto-assignment")
|
||||
}
|
||||
|
||||
func TestCreateServiceFromPeer_TCP(t *testing.T) {
|
||||
mgr, _, _ := setupL4Test(t, boolPtr(false))
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -338,7 +338,7 @@ func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
|
||||
if err := m.ensureL4Port(ctx, transaction, svc, customPorts, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -367,11 +367,11 @@ func (m *Manager) clusterCustomPorts(ctx context.Context, svc *service.Service)
|
||||
|
||||
// ensureL4Port auto-assigns a listen port when needed and validates cluster support.
|
||||
// customPorts must be pre-computed via clusterCustomPorts before entering a transaction.
|
||||
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service, customPorts *bool) error {
|
||||
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service, customPorts *bool, serviceUpdate bool) error {
|
||||
if !service.IsL4Protocol(svc.Mode) {
|
||||
return nil
|
||||
}
|
||||
if service.IsPortBasedProtocol(svc.Mode) && svc.ListenPort > 0 && (customPorts == nil || !*customPorts) {
|
||||
if service.IsPortBasedProtocol(svc.Mode) && svc.ListenPort > 0 && !serviceUpdate && (customPorts == nil || !*customPorts) {
|
||||
if svc.Source != service.SourceEphemeral {
|
||||
return status.Errorf(status.InvalidArgument, "custom ports not supported on cluster %s", svc.ProxyCluster)
|
||||
}
|
||||
@@ -465,7 +465,7 @@ func (m *Manager) persistNewEphemeralService(ctx context.Context, accountID, pee
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
|
||||
if err := m.ensureL4Port(ctx, transaction, svc, customPorts, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -651,12 +651,22 @@ func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.St
|
||||
m.preserveListenPort(service, existingService)
|
||||
updateInfo.serviceEnabledChanged = existingService.Enabled != service.Enabled
|
||||
|
||||
if err := m.ensureL4Port(ctx, transaction, service, customPorts); err != nil {
|
||||
// if the service is being updated, and we decide in the future to allow mode update,
|
||||
// we should reconsider the currently assigned port if not 0 for clusters that don't support custom ports
|
||||
if err := validateL4PortDiffOnClusterDiff(customPorts, service, existingService); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.ensureL4Port(ctx, transaction, service, customPorts, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// we can try carrying the previous service port into a new cluster, if this becomes a problem for multiple users,
|
||||
// we should reconsider adding another check
|
||||
if err := m.checkPortConflict(ctx, transaction, service); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := transaction.UpdateService(ctx, service); err != nil {
|
||||
return fmt.Errorf("update service: %w", err)
|
||||
}
|
||||
@@ -664,6 +674,21 @@ func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.St
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateL4PortDiffOnClusterDiff checks if custom L4 ports are configured and validates port changes across clusters.
|
||||
// It ensures no port changes if custom ports are unsupported for a given cluster and protocol mode.
|
||||
// Returns an error if validation fails, otherwise returns nil.
|
||||
func validateL4PortDiffOnClusterDiff(customPorts *bool, newSVC, oldSVC *service.Service) error {
|
||||
if !service.IsPortBasedProtocol(newSVC.Mode) || (customPorts != nil && *customPorts) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if newSVC.ListenPort != 0 && newSVC.ListenPort != oldSVC.ListenPort {
|
||||
return status.Errorf(status.InvalidArgument, "custom ports not supported on target cluster %s", newSVC.ProxyCluster)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleDomainChange validates the new domain is free inside the transaction
|
||||
// and applies the pre-resolved cluster (computed outside the tx by
|
||||
// resolveEffectiveCluster). It must NOT call clusterDeriver here: that talks
|
||||
|
||||
Reference in New Issue
Block a user