Skip to content

Commit

Permalink
support multi region cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
sergelogvinov committed Sep 26, 2024
1 parent d069b1e commit 2f86fa7
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ The options in `Global` section are used for openstack-cloud-controller-manager
Keystone user password. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `region`
Required. Keystone region name.
* `regions`
Optional. Keystone region name, which is used to specify regions for the cloud provider where the instance is running. Region is default region name. Can be specified multiple times.
* `domain-id`
Keystone user domain ID. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `domain-name`
Expand Down Expand Up @@ -317,7 +319,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N
call](https://docs.openstack.org/api-ref/load-balancer/v2/?expanded=create-a-load-balancer-detail#creating-a-fully-populated-load-balancer).
Setting this option to true will create loadbalancers using serial API calls which first create an unpopulated
loadbalancer, then populate its listeners, pools and members. This is a compatibility option at the expense of
increased load on the OpenStack API. Default: false
increased load on the OpenStack API. Default: false
NOTE:
Expand Down
16 changes: 16 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type AuthOpts struct {
UserDomainID string `gcfg:"user-domain-id" mapstructure:"user-domain-id" name:"os-userDomainID" value:"optional"`
UserDomainName string `gcfg:"user-domain-name" mapstructure:"user-domain-name" name:"os-userDomainName" value:"optional"`
Region string `name:"os-region"`
Regions []string `name:"os-regions" value:"optional"`
EndpointType gophercloud.Availability `gcfg:"os-endpoint-type" mapstructure:"os-endpoint-type" name:"os-endpointType" value:"optional"`
CAFile string `gcfg:"ca-file" mapstructure:"ca-file" name:"os-certAuthorityPath" value:"optional"`
TLSInsecure string `gcfg:"tls-insecure" mapstructure:"tls-insecure" name:"os-TLSInsecure" value:"optional" matches:"^true|false$"`
Expand Down Expand Up @@ -87,6 +88,7 @@ func LogCfg(authOpts AuthOpts) {
klog.V(5).Infof("UserDomainID: %s", authOpts.UserDomainID)
klog.V(5).Infof("UserDomainName: %s", authOpts.UserDomainName)
klog.V(5).Infof("Region: %s", authOpts.Region)
klog.V(5).Infof("Regions: %s", authOpts.Regions)
klog.V(5).Infof("EndpointType: %s", authOpts.EndpointType)
klog.V(5).Infof("CAFile: %s", authOpts.CAFile)
klog.V(5).Infof("CertFile: %s", authOpts.CertFile)
Expand Down Expand Up @@ -232,6 +234,20 @@ func ReadClouds(authOpts *AuthOpts) error {
authOpts.ApplicationCredentialName = replaceEmpty(authOpts.ApplicationCredentialName, cloud.AuthInfo.ApplicationCredentialName)
authOpts.ApplicationCredentialSecret = replaceEmpty(authOpts.ApplicationCredentialSecret, cloud.AuthInfo.ApplicationCredentialSecret)

regions := strings.Split(authOpts.Region, ",")
if len(regions) > 1 {
authOpts.Region = regions[0]
}

for _, r := range cloud.Regions {
// Support only single auth section in clouds.yaml
if r.Values.AuthInfo == nil && r.Name != authOpts.Region {
regions = append(regions, r.Name)
}
}

authOpts.Regions = regions

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/csi/cinder/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func GetConfigFromFiles(configFilePaths []string) (Config, error) {
}
}

for _, global := range cfg.Global {
for idx, global := range cfg.Global {
// Update the config with data from clouds.yaml if UseClouds is enabled
if global.UseClouds {
if global.CloudsFile != "" {
Expand All @@ -136,6 +136,10 @@ func GetConfigFromFiles(configFilePaths []string) (Config, error) {
}
klog.V(5).Infof("Credentials are loaded from %s:", global.CloudsFile)
}

if len(global.Regions) == 0 {
cfg.Global[idx].Regions = []string{global.Region}
}
}

return cfg, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/csi/cinder/openstack/openstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile,
TenantID: fakeTenantID,
Region: fakeRegion,
Regions: []string{fakeRegion},
}
expectedOpts.Global["cloud2"] = &client.AuthOpts{
Username: fakeUserName_cloud2,
Expand All @@ -121,6 +122,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud2,
TenantID: fakeTenantID_cloud2,
Region: fakeRegion_cloud2,
Regions: []string{fakeRegion_cloud2},
}
expectedOpts.Global["cloud3"] = &client.AuthOpts{
Username: fakeUserName_cloud3,
Expand All @@ -130,6 +132,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud3,
TenantID: fakeTenantID_cloud3,
Region: fakeRegion_cloud3,
Regions: []string{fakeRegion_cloud3},
}

expectedOpts.BlockStorage.RescanOnResize = true
Expand Down Expand Up @@ -224,6 +227,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile,
TenantID: fakeTenantID,
Region: fakeRegion,
Regions: []string{fakeRegion},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand All @@ -237,6 +241,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud2,
TenantID: fakeTenantID_cloud2,
Region: fakeRegion_cloud2,
Regions: []string{fakeRegion_cloud2},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand All @@ -250,6 +255,7 @@ rescan-on-resize=true`
CAFile: fakeCAfile_cloud3,
TenantID: fakeTenantID_cloud3,
Region: fakeRegion_cloud3,
Regions: []string{fakeRegion_cloud3},
EndpointType: gophercloud.AvailabilityPublic,
UseClouds: true,
CloudsFile: wd + "/fixtures/clouds.yaml",
Expand Down
175 changes: 122 additions & 53 deletions pkg/openstack/instancesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
sysos "os"
"slices"
"strings"

"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/servers"
Expand All @@ -34,9 +36,9 @@ import (

// InstancesV2 encapsulates an implementation of InstancesV2 for OpenStack.
type InstancesV2 struct {
compute *gophercloud.ServiceClient
network *gophercloud.ServiceClient
region string
compute map[string]*gophercloud.ServiceClient
network map[string]*gophercloud.ServiceClient
regions []string
regionProviderID bool
networkingOpts NetworkingOpts
}
Expand All @@ -52,16 +54,25 @@ func (os *OpenStack) InstancesV2() (cloudprovider.InstancesV2, bool) {
func (os *OpenStack) instancesv2() (*InstancesV2, bool) {
klog.V(4).Info("openstack.Instancesv2() called")

compute, err := client.NewComputeV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}
var err error
compute := make(map[string]*gophercloud.ServiceClient, len(os.regions))
network := make(map[string]*gophercloud.ServiceClient, len(os.regions))

network, err := client.NewNetworkV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
for _, region := range os.regions {
opt := os.epOpts
opt.Region = region

compute[region], err = client.NewComputeV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}

network[region], err = client.NewNetworkV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
}
}

regionalProviderID := false
Expand All @@ -72,17 +83,23 @@ func (os *OpenStack) instancesv2() (*InstancesV2, bool) {
return &InstancesV2{
compute: compute,
network: network,
region: os.epOpts.Region,
regions: os.regions,
regionProviderID: regionalProviderID,
networkingOpts: os.networkingOpts,
}, true
}

// InstanceExists indicates whether a given node exists according to the cloud provider
func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) {
_, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceExists() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

_, _, err := i.getInstance(ctx, node)
if err == cloudprovider.InstanceNotFound {
klog.V(6).Infof("instance not found for node: %s", node.Name)
klog.V(6).InfoS("Node is not found in cloud provider", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])
return false, nil
}

Expand All @@ -95,7 +112,11 @@ func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool,

// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
server, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceShutdown() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

server, _, err := i.getInstance(ctx, node)
if err != nil {
return false, err
}
Expand All @@ -110,7 +131,7 @@ func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool

// InstanceMetadata returns the instance's metadata.
func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
srv, err := i.getInstance(ctx, node)
srv, region, err := i.getInstance(ctx, node)
if err != nil {
return nil, err
}
Expand All @@ -119,79 +140,127 @@ func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*clo
server = *srv
}

instanceType, err := srvInstanceType(i.compute, &server)
instanceType, err := srvInstanceType(i.compute[region], &server)
if err != nil {
return nil, err
}

ports, err := getAttachedPorts(i.network, server.ID)
ports, err := getAttachedPorts(i.network[region], server.ID)
if err != nil {
return nil, err
}

addresses, err := nodeAddresses(&server, ports, i.network, i.networkingOpts)
addresses, err := nodeAddresses(&server, ports, i.network[region], i.networkingOpts)
if err != nil {
return nil, err
}

availabilityZone := util.SanitizeLabel(server.AvailabilityZone)

return &cloudprovider.InstanceMetadata{
ProviderID: i.makeInstanceID(&server),
ProviderID: i.makeInstanceID(&server, region),
InstanceType: instanceType,
NodeAddresses: addresses,
Zone: availabilityZone,
Region: i.region,
Region: region,
}, nil
}

func (i *InstancesV2) makeInstanceID(srv *servers.Server) string {
func (i *InstancesV2) makeInstanceID(srv *servers.Server, region string) string {
if i.regionProviderID {
return fmt.Sprintf("%s://%s/%s", ProviderName, i.region, srv.ID)
return fmt.Sprintf("%s://%s/%s", ProviderName, region, srv.ID)
}
return fmt.Sprintf("%s:///%s", ProviderName, srv.ID)
}

func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*servers.Server, error) {
if node.Spec.ProviderID == "" {
opt := servers.ListOpts{
Name: fmt.Sprintf("^%s$", node.Name),
func (i *InstancesV2) getInstance(_ context.Context, node *v1.Node) (*servers.Server, string, error) {
klog.V(4).InfoS("openstack.getInstance() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID)
if err == nil && instanceRegion != "" {
if slices.Contains(i.regions, instanceRegion) {
return i.getInstanceByID(instanceID, []string{instanceRegion})
}

return nil, "", fmt.Errorf("getInstance: ProviderID \"%s\" didn't match supported regions \"%s\"", node.Spec.ProviderID, strings.Join(i.regions, ","))
}

// At this point we know that ProviderID is not properly set or it doesn't contain region information
// We need to search for the instance in all regions
var searchRegions []string

// We cannot trust the region label, so we need to check the region
instanceRegion = node.Labels[v1.LabelTopologyRegion]
if slices.Contains(i.regions, instanceRegion) {
searchRegions = []string{instanceRegion}
}

for _, r := range i.regions {
if r != instanceRegion {
searchRegions = append(searchRegions, r)
}
}

klog.V(6).InfoS("openstack.getInstance() trying to find the instance in regions", "node", klog.KObj(node),
"instanceID", instanceID,
"regions", strings.Join(searchRegions, ","))

if instanceID == "" {
return i.getInstanceByName(node, searchRegions)
}

return i.getInstanceByID(instanceID, searchRegions)
}

func (i *InstancesV2) getInstanceByID(instanceID string, searchRegions []string) (*servers.Server, string, error) {
server := servers.Server{}

mc := metrics.NewMetricContext("server", "get")
for _, r := range searchRegions {
err := servers.Get(context.TODO(), i.compute[r], instanceID).ExtractInto(&server)
if mc.ObserveRequest(err) != nil {
if errors.IsNotFound(err) {
continue
}

return nil, "", err
}
mc := metrics.NewMetricContext("server", "list")
allPages, err := servers.List(i.compute, opt).AllPages(context.TODO())

return &server, r, nil
}

return nil, "", cloudprovider.InstanceNotFound
}

func (i *InstancesV2) getInstanceByName(node *v1.Node, searchRegions []string) (*servers.Server, string, error) {
opt := servers.ListOpts{
Name: fmt.Sprintf("^%s$", node.Name),
}

serverList := make([]servers.Server, 0, 1)
mc := metrics.NewMetricContext("server", "list")

for _, r := range searchRegions {
allPages, err := servers.List(i.compute[r], opt).AllPages(context.TODO())
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error listing servers %v: %v", opt, err)
return nil, "", fmt.Errorf("error listing servers %v: %v", opt, err)
}

serverList, err := servers.ExtractServers(allPages)
err = servers.ExtractServersInto(allPages, &serverList)
if err != nil {
return nil, fmt.Errorf("error extracting servers from pages: %v", err)
return nil, "", fmt.Errorf("error extracting servers from pages: %v", err)
}
if len(serverList) == 0 {
return nil, cloudprovider.InstanceNotFound
continue
}
if len(serverList) > 1 {
return nil, fmt.Errorf("getInstance: multiple instances found")
return nil, "", fmt.Errorf("getInstanceByName: multiple instances found")
}
return &serverList[0], nil
}

instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return nil, err
}

if instanceRegion != "" && instanceRegion != i.region {
return nil, fmt.Errorf("ProviderID \"%s\" didn't match supported region \"%s\"", node.Spec.ProviderID, i.region)
return &serverList[0], r, nil
}

mc := metrics.NewMetricContext("server", "get")
server, err := servers.Get(context.TODO(), i.compute, instanceID).Extract()
if mc.ObserveRequest(err) != nil {
if errors.IsNotFound(err) {
return nil, cloudprovider.InstanceNotFound
}
return nil, err
}
return server, nil
return nil, "", cloudprovider.InstanceNotFound
}
Loading

0 comments on commit 2f86fa7

Please sign in to comment.