Skip to content

Commit

Permalink
feat(occm): support multi region cluster
Browse files Browse the repository at this point in the history
Currently, it supports only single auth section.
Set the regions in config as:

[Global]
region=REGION1
regions=REGION1
regions=REGION2
regions=REGION3
  • Loading branch information
sergelogvinov committed Dec 1, 2024
1 parent d228854 commit d2e2b76
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ The options in `Global` section are used for openstack-cloud-controller-manager
Keystone user password. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `region`
Required. Keystone region name.
* `regions`
Optional. Keystone region name, which is used to specify regions for the cloud provider where the instance is running. Region is default region name. Can be specified multiple times.
* `domain-id`
Keystone user domain ID. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `domain-name`
Expand Down Expand Up @@ -317,7 +319,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N
call](https://docs.openstack.org/api-ref/load-balancer/v2/?expanded=create-a-load-balancer-detail#creating-a-fully-populated-load-balancer).
Setting this option to true will create loadbalancers using serial API calls which first create an unpopulated
loadbalancer, then populate its listeners, pools and members. This is a compatibility option at the expense of
increased load on the OpenStack API. Default: false
increased load on the OpenStack API. Default: false
NOTE:
Expand Down
16 changes: 16 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type AuthOpts struct {
UserDomainID string `gcfg:"user-domain-id" mapstructure:"user-domain-id" name:"os-userDomainID" value:"optional"`
UserDomainName string `gcfg:"user-domain-name" mapstructure:"user-domain-name" name:"os-userDomainName" value:"optional"`
Region string `name:"os-region"`
Regions []string `name:"os-regions" value:"optional"`
EndpointType gophercloud.Availability `gcfg:"os-endpoint-type" mapstructure:"os-endpoint-type" name:"os-endpointType" value:"optional"`
CAFile string `gcfg:"ca-file" mapstructure:"ca-file" name:"os-certAuthorityPath" value:"optional"`
TLSInsecure string `gcfg:"tls-insecure" mapstructure:"tls-insecure" name:"os-TLSInsecure" value:"optional" matches:"^true|false$"`
Expand Down Expand Up @@ -87,6 +88,7 @@ func LogCfg(authOpts AuthOpts) {
klog.V(5).Infof("UserDomainID: %s", authOpts.UserDomainID)
klog.V(5).Infof("UserDomainName: %s", authOpts.UserDomainName)
klog.V(5).Infof("Region: %s", authOpts.Region)
klog.V(5).Infof("Regions: %s", authOpts.Regions)
klog.V(5).Infof("EndpointType: %s", authOpts.EndpointType)
klog.V(5).Infof("CAFile: %s", authOpts.CAFile)
klog.V(5).Infof("CertFile: %s", authOpts.CertFile)
Expand Down Expand Up @@ -232,6 +234,20 @@ func ReadClouds(authOpts *AuthOpts) error {
authOpts.ApplicationCredentialName = replaceEmpty(authOpts.ApplicationCredentialName, cloud.AuthInfo.ApplicationCredentialName)
authOpts.ApplicationCredentialSecret = replaceEmpty(authOpts.ApplicationCredentialSecret, cloud.AuthInfo.ApplicationCredentialSecret)

regions := strings.Split(authOpts.Region, ",")
if len(regions) > 1 {
authOpts.Region = regions[0]
}

for _, r := range cloud.Regions {
// Support only single auth section in clouds.yaml
if r.Values.AuthInfo == nil && r.Name != authOpts.Region {
regions = append(regions, r.Name)
}
}

authOpts.Regions = regions

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/csi/cinder/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func GetConfigFromFiles(configFilePaths []string) (Config, error) {
}
}

for _, global := range cfg.Global {
for idx, global := range cfg.Global {
// Update the config with data from clouds.yaml if UseClouds is enabled
if global.UseClouds {
if global.CloudsFile != "" {
Expand All @@ -138,6 +138,10 @@ func GetConfigFromFiles(configFilePaths []string) (Config, error) {
}
klog.V(5).Infof("Credentials are loaded from %s:", global.CloudsFile)
}

if len(global.Regions) == 0 {
cfg.Global[idx].Regions = []string{global.Region}
}
}

return cfg, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/csi/cinder/openstack/openstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile,
TenantID: fakeTenantID,
Region: fakeRegion,
Regions: []string{fakeRegion},
}
expectedOpts.Global["cloud2"] = &client.AuthOpts{
Username: fakeUserName_cloud2,
Expand All @@ -121,6 +122,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud2,
TenantID: fakeTenantID_cloud2,
Region: fakeRegion_cloud2,
Regions: []string{fakeRegion_cloud2},
}
expectedOpts.Global["cloud3"] = &client.AuthOpts{
Username: fakeUserName_cloud3,
Expand All @@ -130,6 +132,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud3,
TenantID: fakeTenantID_cloud3,
Region: fakeRegion_cloud3,
Regions: []string{fakeRegion_cloud3},
}

expectedOpts.BlockStorage.RescanOnResize = true
Expand Down Expand Up @@ -224,6 +227,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile,
TenantID: fakeTenantID,
Region: fakeRegion,
Regions: []string{fakeRegion},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand All @@ -237,6 +241,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud2,
TenantID: fakeTenantID_cloud2,
Region: fakeRegion_cloud2,
Regions: []string{fakeRegion_cloud2},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand All @@ -250,6 +255,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud3,
TenantID: fakeTenantID_cloud3,
Region: fakeRegion_cloud3,
Regions: []string{fakeRegion_cloud3},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand Down
178 changes: 140 additions & 38 deletions pkg/openstack/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
sysos "os"
"regexp"
"slices"
"strings"

"github.com/gophercloud/gophercloud/v2"
Expand All @@ -46,9 +47,9 @@ const (

// InstancesV2 encapsulates an implementation of InstancesV2 for OpenStack.
type InstancesV2 struct {
compute *gophercloud.ServiceClient
network *gophercloud.ServiceClient
region string
compute map[string]*gophercloud.ServiceClient
network map[string]*gophercloud.ServiceClient
regions []string
regionProviderID bool
networkingOpts NetworkingOpts
}
Expand All @@ -57,16 +58,25 @@ type InstancesV2 struct {
func (os *OpenStack) InstancesV2() (cloudprovider.InstancesV2, bool) {
klog.V(4).Info("openstack.Instancesv2() called")

compute, err := client.NewComputeV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}
var err error
compute := make(map[string]*gophercloud.ServiceClient, len(os.regions))
network := make(map[string]*gophercloud.ServiceClient, len(os.regions))

network, err := client.NewNetworkV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
for _, region := range os.regions {
opt := os.epOpts
opt.Region = region

compute[region], err = client.NewComputeV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}

network[region], err = client.NewNetworkV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
}
}

regionalProviderID := false
Expand All @@ -77,17 +87,23 @@ func (os *OpenStack) InstancesV2() (cloudprovider.InstancesV2, bool) {
return &InstancesV2{
compute: compute,
network: network,
region: os.epOpts.Region,
regions: os.regions,
regionProviderID: regionalProviderID,
networkingOpts: os.networkingOpts,
}, true
}

// InstanceExists indicates whether a given node exists according to the cloud provider
func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) {
_, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceExists() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

_, _, err := i.getInstance(ctx, node)
if err == cloudprovider.InstanceNotFound {
klog.V(6).Infof("instance not found for node: %s", node.Name)
klog.V(6).InfoS("Node is not found in cloud provider", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])
return false, nil
}

Expand All @@ -100,7 +116,11 @@ func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool,

// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
server, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceShutdown() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

server, _, err := i.getInstance(ctx, node)
if err != nil {
return false, err
}
Expand All @@ -115,7 +135,11 @@ func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool

// InstanceMetadata returns the instance's metadata.
func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
srv, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceMetadata() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

srv, region, err := i.getInstance(ctx, node)
if err != nil {
return nil, err
}
Expand All @@ -124,62 +148,140 @@ func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*clo
server = *srv
}

instanceType, err := srvInstanceType(ctx, i.compute, &server)
instanceType, err := srvInstanceType(ctx, i.compute[region], &server)
if err != nil {
return nil, err
}

ports, err := getAttachedPorts(ctx, i.network, server.ID)
ports, err := getAttachedPorts(ctx, i.network[region], server.ID)
if err != nil {
return nil, err
}

addresses, err := nodeAddresses(ctx, &server, ports, i.network, i.networkingOpts)
addresses, err := nodeAddresses(ctx, &server, ports, i.network[region], i.networkingOpts)
if err != nil {
return nil, err
}

availabilityZone := util.SanitizeLabel(server.AvailabilityZone)

return &cloudprovider.InstanceMetadata{
ProviderID: i.makeInstanceID(&server),
ProviderID: i.makeInstanceID(&server, region),
InstanceType: instanceType,
NodeAddresses: addresses,
Zone: availabilityZone,
Region: i.region,
Region: region,
}, nil
}

func (i *InstancesV2) makeInstanceID(srv *servers.Server) string {
func (i *InstancesV2) makeInstanceID(srv *servers.Server, region string) string {
if i.regionProviderID {
return fmt.Sprintf("%s://%s/%s", ProviderName, i.region, srv.ID)
return fmt.Sprintf("%s://%s/%s", ProviderName, region, srv.ID)
}
return fmt.Sprintf("%s:///%s", ProviderName, srv.ID)
}

func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*servers.Server, error) {
if node.Spec.ProviderID == "" {
return getServerByName(ctx, i.compute, node.Name)
func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*servers.Server, string, error) {
var instanceID, instanceRegion string

if node.Spec.ProviderID != "" {
var err error

instanceID, instanceRegion, err = instanceIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return nil, "", err
}
}

instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return nil, err
if instanceRegion != "" {
if slices.Contains(i.regions, instanceRegion) {
return i.getInstanceByID(ctx, instanceID, []string{instanceRegion})
}

return nil, "", fmt.Errorf("getInstance: ProviderID \"%s\" didn't match supported regions \"%s\"", node.Spec.ProviderID, strings.Join(i.regions, ","))
}

// At this point we know that ProviderID is not properly set or it doesn't contain region information
// We need to search for the instance in all regions
var searchRegions []string

// We cannot trust the region label, so we need to check the region
instanceRegion = node.Labels[v1.LabelTopologyRegion]
if slices.Contains(i.regions, instanceRegion) {
searchRegions = []string{instanceRegion}
}

if instanceRegion != "" && instanceRegion != i.region {
return nil, fmt.Errorf("ProviderID \"%s\" didn't match supported region \"%s\"", node.Spec.ProviderID, i.region)
for _, r := range i.regions {
if r != instanceRegion {
searchRegions = append(searchRegions, r)
}
}

klog.V(4).InfoS("openstack.getInstance() trying to find the instance in regions", "node", klog.KObj(node),
"instanceID", instanceID,
"regions", strings.Join(searchRegions, ","))

if instanceID == "" {
return i.getInstanceByName(ctx, node.Name, searchRegions)
}

return i.getInstanceByID(ctx, instanceID, searchRegions)
}

func (i *InstancesV2) getInstanceByID(ctx context.Context, instanceID string, searchRegions []string) (*servers.Server, string, error) {
server := servers.Server{}

mc := metrics.NewMetricContext("server", "get")
server, err := servers.Get(ctx, i.compute, instanceID).Extract()
if mc.ObserveRequest(err) != nil {
if errors.IsNotFound(err) {
return nil, cloudprovider.InstanceNotFound
for _, r := range searchRegions {
err := servers.Get(ctx, i.compute[r], instanceID).ExtractInto(&server)
if mc.ObserveRequest(err) != nil {
if errors.IsNotFound(err) {
continue
}

return nil, "", err
}
return nil, err

return &server, r, nil
}
return server, nil

return nil, "", cloudprovider.InstanceNotFound
}

func (i *InstancesV2) getInstanceByName(ctx context.Context, name string, searchRegions []string) (*servers.Server, string, error) {
opts := servers.ListOpts{
Name: fmt.Sprintf("^%s$", regexp.QuoteMeta(name)),
}

serverList := make([]servers.Server, 0, 1)
mc := metrics.NewMetricContext("server", "list")

for _, r := range searchRegions {
pager := servers.List(i.compute[r], opts)

err := pager.EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) {
s, err := servers.ExtractServers(page)
if err != nil {
return false, err
}
serverList = append(serverList, s...)
if len(serverList) > 1 {
return false, errors.ErrMultipleResults
}
return true, nil
})
if mc.ObserveRequest(err) != nil {
return nil, "", err
}

if len(serverList) == 0 {
continue
}

return &serverList[0], r, nil
}

return nil, "", cloudprovider.InstanceNotFound
}

func getServerByName(ctx context.Context, client *gophercloud.ServiceClient, name string) (*servers.Server, error) {
Expand Down
Loading

0 comments on commit d2e2b76

Please sign in to comment.