Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module service insulate #343

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 168 additions & 160 deletions src/viam/sdk/module/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,185 +42,193 @@
namespace viam {
namespace sdk {

Dependencies ModuleService::get_dependencies_(
google::protobuf::RepeatedPtrField<std::string> const& proto,
std::string const& resource_name) {
Dependencies deps;
for (const auto& dep : proto) {
auto dep_name = Name::from_string(dep);
const std::shared_ptr<Resource> dep_resource = get_parent_resource_(dep_name);
if (!dep_resource) {
std::ostringstream buffer;
buffer << resource_name << ": Dependency "
<< "`" << dep_name << "` was not found during (re)configuration";
throw Exception(ErrorCondition::k_resource_not_found, buffer.str());
struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service {
ServiceImpl(ModuleService& p) : parent(p) {}

ModuleService& parent;

// TODO(RSDK-6528) - to the extent possible, switch to using `server_helper`
::grpc::Status AddResource(::grpc::ServerContext*,
const ::viam::module::v1::AddResourceRequest* request,
::viam::module::v1::AddResourceResponse*) override {
const viam::app::v1::ComponentConfig& proto = request->config();
const ResourceConfig cfg = v2::from_proto(proto);
const std::lock_guard<std::mutex> lock(parent.lock_);

std::shared_ptr<Resource> res;
const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name());
const std::shared_ptr<const ModelRegistration> reg =
Registry::lookup_model(cfg.api(), cfg.model());
if (reg) {
try {
res = reg->construct_resource(deps, cfg);
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}
};
try {
parent.server_->add_resource(res);
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}
deps.emplace(dep_name, dep_resource);
}
return deps;
}

std::shared_ptr<Resource> ModuleService::get_parent_resource_(const Name& name) {
if (!parent_) {
parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none});
return grpc::Status();
}

return parent_->resource_by_name(name);
}
::grpc::Status ReconfigureResource(
::grpc::ServerContext*,
const ::viam::module::v1::ReconfigureResourceRequest* request,
::viam::module::v1::ReconfigureResourceResponse*) override {
const viam::app::v1::ComponentConfig& proto = request->config();
ResourceConfig cfg = v2::from_proto(proto);

// TODO(RSDK-6528) - to the extent possible, switch to using `server_helper`
::grpc::Status ModuleService::AddResource(::grpc::ServerContext*,
const ::viam::module::v1::AddResourceRequest* request,
::viam::module::v1::AddResourceResponse*) {
const viam::app::v1::ComponentConfig& proto = request->config();
const ResourceConfig cfg = v2::from_proto(proto);
const std::lock_guard<std::mutex> lock(lock_);
const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name());

std::shared_ptr<Resource> res;
const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name());
const std::shared_ptr<const ModelRegistration> reg =
Registry::lookup_model(cfg.api(), cfg.model());
if (reg) {
auto resource_server = parent.server_->lookup_resource_server(cfg.api());
if (!resource_server) {
return grpc::Status(grpc::UNKNOWN,
"no rpc service for config: " + cfg.api().to_string());
}
auto manager = resource_server->resource_manager();

// see if our resource is reconfigurable. if it is, reconfigure
const std::shared_ptr<Resource> res = manager->resource(cfg.resource_name().name());
if (!res) {
return grpc::Status(grpc::UNKNOWN,
"unable to reconfigure resource " + cfg.resource_name().name() +
" as it doesn't exist.");
}
try {
res = reg->construct_resource(deps, cfg);
Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg);
return grpc::Status();
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}
};
try {
server_->add_resource(res);
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}

return grpc::Status();
};

::grpc::Status ModuleService::ReconfigureResource(
::grpc::ServerContext*,
const ::viam::module::v1::ReconfigureResourceRequest* request,
::viam::module::v1::ReconfigureResourceResponse*) {
const viam::app::v1::ComponentConfig& proto = request->config();
ResourceConfig cfg = v2::from_proto(proto);
// if the type isn't reconfigurable by default, replace it
try {
Stoppable::stop_if_stoppable(res);
} catch (const std::exception& err) {
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
}

const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name());
const std::shared_ptr<const ModelRegistration> reg = Registry::lookup_model(cfg.name());
if (reg) {
try {
const std::shared_ptr<Resource> res = reg->construct_resource(deps, cfg);
manager->replace_one(cfg.resource_name(), res);
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}
}

auto resource_server = server_->lookup_resource_server(cfg.api());
if (!resource_server) {
return grpc::Status(grpc::UNKNOWN, "no rpc service for config: " + cfg.api().to_string());
}
auto manager = resource_server->resource_manager();

// see if our resource is reconfigurable. if it is, reconfigure
const std::shared_ptr<Resource> res = manager->resource(cfg.resource_name().name());
if (!res) {
return grpc::Status(grpc::UNKNOWN,
"unable to reconfigure resource " + cfg.resource_name().name() +
" as it doesn't exist.");
}
try {
Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg);
return grpc::Status();
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
}

// if the type isn't reconfigurable by default, replace it
try {
Stoppable::stop_if_stoppable(res);
} catch (const std::exception& err) {
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
}

const std::shared_ptr<const ModelRegistration> reg = Registry::lookup_model(cfg.name());
if (reg) {
::grpc::Status ValidateConfig(::grpc::ServerContext*,
const ::viam::module::v1::ValidateConfigRequest* request,
::viam::module::v1::ValidateConfigResponse* response) override {
const viam::app::v1::ComponentConfig& proto = request->config();
ResourceConfig cfg = v2::from_proto(proto);

const std::shared_ptr<const ModelRegistration> reg =
Registry::lookup_model(cfg.api(), cfg.model());
if (!reg) {
return grpc::Status(grpc::UNKNOWN,
"unable to validate resource " + cfg.resource_name().name() +
" as it hasn't been registered.");
}
try {
const std::shared_ptr<Resource> res = reg->construct_resource(deps, cfg);
manager->replace_one(cfg.resource_name(), res);
} catch (const std::exception& exc) {
return grpc::Status(::grpc::INTERNAL, exc.what());
const std::vector<std::string> implicit_deps = reg->validate(cfg);
for (const auto& dep : implicit_deps) {
response->add_dependencies(dep);
}
} catch (const std::exception& err) {
return grpc::Status(grpc::UNKNOWN,
"validation failure in resource " + cfg.name() + ": " + err.what());
}
return grpc::Status();
}

return grpc::Status();
};
::grpc::Status RemoveResource(::grpc::ServerContext*,
const ::viam::module::v1::RemoveResourceRequest* request,
::viam::module::v1::RemoveResourceResponse*) override {
auto name = Name::from_string(request->name());
auto resource_server = parent.server_->lookup_resource_server(name.api());
if (!resource_server) {
return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string());
}
const std::shared_ptr<ResourceManager> manager = resource_server->resource_manager();
const std::shared_ptr<Resource> res = manager->resource(name.name());
if (!res) {
return grpc::Status(
grpc::UNKNOWN,
"unable to remove resource " + name.to_string() + " as it doesn't exist.");
}

::grpc::Status ModuleService::ValidateConfig(
::grpc::ServerContext*,
const ::viam::module::v1::ValidateConfigRequest* request,
::viam::module::v1::ValidateConfigResponse* response) {
const viam::app::v1::ComponentConfig& proto = request->config();
ResourceConfig cfg = v2::from_proto(proto);

const std::shared_ptr<const ModelRegistration> reg =
Registry::lookup_model(cfg.api(), cfg.model());
if (!reg) {
return grpc::Status(grpc::UNKNOWN,
"unable to validate resource " + cfg.resource_name().name() +
" as it hasn't been registered.");
}
try {
const std::vector<std::string> implicit_deps = reg->validate(cfg);
for (const auto& dep : implicit_deps) {
response->add_dependencies(dep);
try {
Stoppable::stop_if_stoppable(res);
} catch (const std::exception& err) {
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
}
} catch (const std::exception& err) {
return grpc::Status(grpc::UNKNOWN,
"validation failure in resource " + cfg.name() + ": " + err.what());
}
return grpc::Status();
};

::grpc::Status ModuleService::RemoveResource(
::grpc::ServerContext*,
const ::viam::module::v1::RemoveResourceRequest* request,
::viam::module::v1::RemoveResourceResponse*) {
auto name = Name::from_string(request->name());
auto resource_server = server_->lookup_resource_server(name.api());
if (!resource_server) {
return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string());
manager->remove(name);
return grpc::Status();
}
const std::shared_ptr<ResourceManager> manager = resource_server->resource_manager();
const std::shared_ptr<Resource> res = manager->resource(name.name());
if (!res) {
return grpc::Status(
grpc::UNKNOWN,
"unable to remove resource " + name.to_string() + " as it doesn't exist.");

::grpc::Status Ready(::grpc::ServerContext*,
const ::viam::module::v1::ReadyRequest* request,
::viam::module::v1::ReadyResponse* response) override {
const std::lock_guard<std::mutex> lock(parent.lock_);
const viam::module::v1::HandlerMap hm = parent.module_->handles().to_proto();
*response->mutable_handlermap() = hm;
parent.parent_addr_ = request->parent_address();
response->set_ready(parent.module_->ready());
return grpc::Status();
}
};

try {
Stoppable::stop_if_stoppable(res);
} catch (const std::exception& err) {
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
Dependencies ModuleService::get_dependencies_(
google::protobuf::RepeatedPtrField<std::string> const& proto,
std::string const& resource_name) {
Dependencies deps;
for (const auto& dep : proto) {
auto dep_name = Name::from_string(dep);
const std::shared_ptr<Resource> dep_resource = get_parent_resource_(dep_name);
if (!dep_resource) {
std::ostringstream buffer;
buffer << resource_name << ": Dependency "
<< "`" << dep_name << "` was not found during (re)configuration";
throw Exception(ErrorCondition::k_resource_not_found, buffer.str());
}
deps.emplace(dep_name, dep_resource);
}
return deps;
}

manager->remove(name);
return grpc::Status();
};
std::shared_ptr<Resource> ModuleService::get_parent_resource_(const Name& name) {
if (!parent_) {
parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none});
}

::grpc::Status ModuleService::Ready(::grpc::ServerContext*,
const ::viam::module::v1::ReadyRequest* request,
::viam::module::v1::ReadyResponse* response) {
const std::lock_guard<std::mutex> lock(lock_);
const viam::module::v1::HandlerMap hm = this->module_->handles().to_proto();
*response->mutable_handlermap() = hm;
parent_addr_ = request->parent_address();
response->set_ready(module_->ready());
return grpc::Status();
};
return parent_->resource_by_name(name);
}

ModuleService::ModuleService(std::string addr)
: module_(std::make_unique<Module>(std::move(addr))), server_(std::make_unique<Server>()) {}
: module_(std::make_unique<Module>(std::move(addr))), server_(std::make_unique<Server>()) {
impl_ = std::make_unique<ServiceImpl>(*this);
}

ModuleService::ModuleService(int argc,
char** argv,
const std::vector<std::shared_ptr<ModelRegistration>>& registrations) {
if (argc < 2) {
throw Exception(ErrorCondition::k_connection, "Need socket path as command line argument");
}
module_ = std::make_unique<Module>(argv[1]);
server_ = std::make_unique<Server>();
signal_manager_ = SignalManager();
const std::vector<std::shared_ptr<ModelRegistration>>& registrations)
: ModuleService([argc, argv] {
if (argc < 2) {
throw Exception(ErrorCondition::k_connection,
"Need socket path as command line argument");
}
return argv[1];
}()) {
set_logger_severity_from_args(argc, argv);

for (auto&& mr : registrations) {
Expand All @@ -229,13 +237,27 @@ ModuleService::ModuleService(int argc,
}
}

ModuleService::~ModuleService() {
// TODO(RSDK-5509): Run registered cleanup functions here.
BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully.";
server_->shutdown();

if (parent_) {
try {
parent_->close();
} catch (const std::exception& exc) {
BOOST_LOG_TRIVIAL(error) << exc.what();
}
}
}

void ModuleService::serve() {
const mode_t old_mask = umask(0077);
const int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
listen(sockfd, 10);
umask(old_mask);

server_->register_service(this);
server_->register_service(impl_.get());
const std::string address = "unix://" + module_->addr();
server_->add_listening_port(address);

Expand All @@ -249,20 +271,6 @@ void ModuleService::serve() {
signal_manager_.wait();
}

ModuleService::~ModuleService() {
// TODO(RSDK-5509): Run registered cleanup functions here.
BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully.";
server_->shutdown();

if (parent_) {
try {
parent_->close();
} catch (const std::exception& exc) {
BOOST_LOG_TRIVIAL(error) << exc.what();
}
}
}

void ModuleService::add_model_from_registry_inlock_(API api,
Model model,
const std::lock_guard<std::mutex>&) {
Expand Down
Loading
Loading