Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-7128 support modules on windows #4605

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ full-static:

windows:
mkdir -p bin/windows
GOOS=windows go build -tags no_cgo -ldflags="-extldflags=-static $(COMMON_LDFLAGS)" -o bin/windows/viam-server-$(shell go env GOARCH) ./web/cmd/server
GOOS=windows go build -tags no_cgo -ldflags="-extldflags=-static $(COMMON_LDFLAGS)" -o bin/windows/viam-server-$(shell go env GOARCH).exe ./web/cmd/server
cd bin/windows && zip viam.zip viam-server-$(shell go env GOARCH).exe

server-static-compressed: server-static
upx --best --lzma $(BIN_OUTPUT_PATH)/viam-server
Expand Down
9 changes: 9 additions & 0 deletions ftdc/ftdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"regexp"
"runtime"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -207,6 +208,11 @@ func (ftdc *FTDC) Remove(name string) {
// Start spins off the background goroutine for collecting + writing FTDC data. It's normal for tests
// to _not_ call `Start`. Tests can simulate the same functionality by calling `constructDatum` and `writeDatum`.
func (ftdc *FTDC) Start() {
if runtime.GOOS == "windows" {
// note: this logs a panic on RDK start on windows.
ftdc.logger.Warn("FTDC not implemented on windows, not starting")
return
}
ftdc.readStatsWorker = utils.NewStoppableWorkerWithTicker(time.Second, ftdc.statsReader)
utils.PanicCapturingGo(ftdc.statsWriter)

Expand Down Expand Up @@ -276,6 +282,9 @@ func (ftdc *FTDC) statsWriter() {
// `statsWriter` by hand, without the `statsReader` can `close(ftdc.datumCh)` followed by
// `<-ftdc.outputWorkerDone` to stop+wait for the `statsWriter`.
func (ftdc *FTDC) StopAndJoin(ctx context.Context) {
if runtime.GOOS == "windows" {
return
}
ftdc.stopOnce.Do(func() {
// Only one caller should close the datum channel. And it should be the caller that called
// stop on the worker writing to the channel.
Expand Down
47 changes: 35 additions & 12 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,6 +41,9 @@ import (
rutils "go.viam.com/rdk/utils"
)

// tcpPortRange is the beginning of the port range. Only used when ViamTCPSockets() = true.
const tcpPortRange = 13500
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when a port is occupied?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a port is occupied, this breaks; long term, we need a better way to do this. can potentially request an unused port (:0) and have module report to RDK via the parent socket, but that's more plumbing

Copy link
Member

@dgottlieb dgottlieb Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm perfectly cool with this. I'd recommend copying that expectation into the documentation:

tcpPortRange is the beginning of the port range. Only used when ViamTCPSockets() = true. If a port is already taken, the server breaks and behaves in an undefined way. Given this is for a pre-production windows release, we're ok with this outcome. The code/algorithm will become more robust in the future.


var (
validateConfigTimeout = 5 * time.Second
errMessageExitStatus143 = "exit status 143"
Expand Down Expand Up @@ -67,6 +71,7 @@ func NewManager(
restartCtx: restartCtx,
restartCtxCancel: restartCtxCancel,
packagesDir: options.PackagesDir,
nextPort: tcpPortRange,
}
}

Expand Down Expand Up @@ -99,6 +104,8 @@ type module struct {
inStartup atomic.Bool
inRecoveryLock sync.Mutex
logger logging.Logger
// port stores the listen port of this module when ViamTCPSockets() = true.
port int
}

type addedResource struct {
Expand Down Expand Up @@ -178,6 +185,8 @@ type Manager struct {
removeOrphanedResources func(ctx context.Context, rNames []resource.Name)
restartCtx context.Context
restartCtxCancel context.CancelFunc
// nextPort manages ports when ViamTCPSockets() = true.
nextPort int
}

// Close terminates module connections and processes.
Expand Down Expand Up @@ -318,7 +327,9 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo
dataDir: moduleDataDir,
resources: map[resource.Name]*addedResource{},
logger: moduleLogger,
port: mgr.nextPort,
}
mgr.nextPort++

if err := mgr.startModule(ctx, mod); err != nil {
return err
Expand Down Expand Up @@ -988,8 +999,12 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
func (m *module) dial() error {
// TODO(PRODUCT-343): session support probably means interceptors here
var err error
addrToDial := m.addr
if !rutils.TCPRegex.MatchString(addrToDial) {
addrToDial = "unix://" + m.addr
}
conn, err := grpc.Dial( //nolint:staticcheck
"unix://"+m.addr,
addrToDial,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(
rdkgrpc.EnsureTimeoutUnaryClientInterceptor,
Expand Down Expand Up @@ -1096,11 +1111,16 @@ func (m *module) startProcess(
packagesDir string,
) error {
var err error
// append a random alpha string to the module name while creating a socket address to avoid conflicts
// with old versions of the module.
if m.addr, err = modlib.CreateSocketAddress(
filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil {
return err

if rutils.ViamTCPSockets() {
m.addr = "127.0.0.1:" + strconv.Itoa(m.port)
} else {
// append a random alpha string to the module name while creating a socket address to avoid conflicts
// with old versions of the module.
if m.addr, err = modlib.CreateSocketAddress(
filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil {
return err
}
}

// We evaluate the Module's ExePath absolutely in the viam-server process so that
Expand Down Expand Up @@ -1164,12 +1184,15 @@ func (m *module) startProcess(
)
}
}
err = modlib.CheckSocketOwner(m.addr)
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return errors.WithMessage(err, "module startup failed")
if !rutils.TCPRegex.MatchString(m.addr) {
abe-winter marked this conversation as resolved.
Show resolved Hide resolved
// note: we don't do this check in TCP mode because TCP addresses are not file paths and will fail check.
err = modlib.CheckSocketOwner(m.addr)
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return errors.WithMessage(err, "module startup failed")
}
}
break
}
Expand Down
Loading
Loading