diff --git a/src/viam/sdk/module/service.cpp b/src/viam/sdk/module/service.cpp index 430fbf2b7..06dbf80fb 100644 --- a/src/viam/sdk/module/service.cpp +++ b/src/viam/sdk/module/service.cpp @@ -42,185 +42,193 @@ namespace viam { namespace sdk { -Dependencies ModuleService::get_dependencies_( - google::protobuf::RepeatedPtrField const& proto, - std::string const& resource_name) { - Dependencies deps; - for (const auto& dep : proto) { - auto dep_name = Name::from_string(dep); - const std::shared_ptr dep_resource = get_parent_resource_(dep_name); - if (!dep_resource) { - std::ostringstream buffer; - buffer << resource_name << ": Dependency " - << "`" << dep_name << "` was not found during (re)configuration"; - throw Exception(ErrorCondition::k_resource_not_found, buffer.str()); +struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { + ServiceImpl(ModuleService& p) : parent(p) {} + + ModuleService& parent; + + // TODO(RSDK-6528) - to the extent possible, switch to using `server_helper` + ::grpc::Status AddResource(::grpc::ServerContext*, + const ::viam::module::v1::AddResourceRequest* request, + ::viam::module::v1::AddResourceResponse*) override { + const viam::app::v1::ComponentConfig& proto = request->config(); + const ResourceConfig cfg = v2::from_proto(proto); + const std::lock_guard lock(parent.lock_); + + std::shared_ptr res; + const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name()); + const std::shared_ptr reg = + Registry::lookup_model(cfg.api(), cfg.model()); + if (reg) { + try { + res = reg->construct_resource(deps, cfg); + } catch (const std::exception& exc) { + return grpc::Status(::grpc::INTERNAL, exc.what()); + } + }; + try { + parent.server_->add_resource(res); + } catch (const std::exception& exc) { + return grpc::Status(::grpc::INTERNAL, exc.what()); } - deps.emplace(dep_name, dep_resource); - } - return deps; -} -std::shared_ptr ModuleService::get_parent_resource_(const Name& name) { - if (!parent_) { - parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none}); + return grpc::Status(); } - return parent_->resource_by_name(name); -} + ::grpc::Status ReconfigureResource( + ::grpc::ServerContext*, + const ::viam::module::v1::ReconfigureResourceRequest* request, + ::viam::module::v1::ReconfigureResourceResponse*) override { + const viam::app::v1::ComponentConfig& proto = request->config(); + ResourceConfig cfg = v2::from_proto(proto); -// TODO(RSDK-6528) - to the extent possible, switch to using `server_helper` -::grpc::Status ModuleService::AddResource(::grpc::ServerContext*, - const ::viam::module::v1::AddResourceRequest* request, - ::viam::module::v1::AddResourceResponse*) { - const viam::app::v1::ComponentConfig& proto = request->config(); - const ResourceConfig cfg = v2::from_proto(proto); - const std::lock_guard lock(lock_); + const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name()); - std::shared_ptr res; - const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name()); - const std::shared_ptr reg = - Registry::lookup_model(cfg.api(), cfg.model()); - if (reg) { + auto resource_server = parent.server_->lookup_resource_server(cfg.api()); + if (!resource_server) { + return grpc::Status(grpc::UNKNOWN, + "no rpc service for config: " + cfg.api().to_string()); + } + auto manager = resource_server->resource_manager(); + + // see if our resource is reconfigurable. if it is, reconfigure + const std::shared_ptr res = manager->resource(cfg.resource_name().name()); + if (!res) { + return grpc::Status(grpc::UNKNOWN, + "unable to reconfigure resource " + cfg.resource_name().name() + + " as it doesn't exist."); + } try { - res = reg->construct_resource(deps, cfg); + Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg); + return grpc::Status(); } catch (const std::exception& exc) { return grpc::Status(::grpc::INTERNAL, exc.what()); } - }; - try { - server_->add_resource(res); - } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); - } - - return grpc::Status(); -}; -::grpc::Status ModuleService::ReconfigureResource( - ::grpc::ServerContext*, - const ::viam::module::v1::ReconfigureResourceRequest* request, - ::viam::module::v1::ReconfigureResourceResponse*) { - const viam::app::v1::ComponentConfig& proto = request->config(); - ResourceConfig cfg = v2::from_proto(proto); + // if the type isn't reconfigurable by default, replace it + try { + Stoppable::stop_if_stoppable(res); + } catch (const std::exception& err) { + BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what(); + } - const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name()); + const std::shared_ptr reg = Registry::lookup_model(cfg.name()); + if (reg) { + try { + const std::shared_ptr res = reg->construct_resource(deps, cfg); + manager->replace_one(cfg.resource_name(), res); + } catch (const std::exception& exc) { + return grpc::Status(::grpc::INTERNAL, exc.what()); + } + } - auto resource_server = server_->lookup_resource_server(cfg.api()); - if (!resource_server) { - return grpc::Status(grpc::UNKNOWN, "no rpc service for config: " + cfg.api().to_string()); - } - auto manager = resource_server->resource_manager(); - - // see if our resource is reconfigurable. if it is, reconfigure - const std::shared_ptr res = manager->resource(cfg.resource_name().name()); - if (!res) { - return grpc::Status(grpc::UNKNOWN, - "unable to reconfigure resource " + cfg.resource_name().name() + - " as it doesn't exist."); - } - try { - Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg); return grpc::Status(); - } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); - } - - // if the type isn't reconfigurable by default, replace it - try { - Stoppable::stop_if_stoppable(res); - } catch (const std::exception& err) { - BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what(); } - const std::shared_ptr reg = Registry::lookup_model(cfg.name()); - if (reg) { + ::grpc::Status ValidateConfig(::grpc::ServerContext*, + const ::viam::module::v1::ValidateConfigRequest* request, + ::viam::module::v1::ValidateConfigResponse* response) override { + const viam::app::v1::ComponentConfig& proto = request->config(); + ResourceConfig cfg = v2::from_proto(proto); + + const std::shared_ptr reg = + Registry::lookup_model(cfg.api(), cfg.model()); + if (!reg) { + return grpc::Status(grpc::UNKNOWN, + "unable to validate resource " + cfg.resource_name().name() + + " as it hasn't been registered."); + } try { - const std::shared_ptr res = reg->construct_resource(deps, cfg); - manager->replace_one(cfg.resource_name(), res); - } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); + const std::vector implicit_deps = reg->validate(cfg); + for (const auto& dep : implicit_deps) { + response->add_dependencies(dep); + } + } catch (const std::exception& err) { + return grpc::Status(grpc::UNKNOWN, + "validation failure in resource " + cfg.name() + ": " + err.what()); } + return grpc::Status(); } - return grpc::Status(); -}; + ::grpc::Status RemoveResource(::grpc::ServerContext*, + const ::viam::module::v1::RemoveResourceRequest* request, + ::viam::module::v1::RemoveResourceResponse*) override { + auto name = Name::from_string(request->name()); + auto resource_server = parent.server_->lookup_resource_server(name.api()); + if (!resource_server) { + return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string()); + } + const std::shared_ptr manager = resource_server->resource_manager(); + const std::shared_ptr res = manager->resource(name.name()); + if (!res) { + return grpc::Status( + grpc::UNKNOWN, + "unable to remove resource " + name.to_string() + " as it doesn't exist."); + } -::grpc::Status ModuleService::ValidateConfig( - ::grpc::ServerContext*, - const ::viam::module::v1::ValidateConfigRequest* request, - ::viam::module::v1::ValidateConfigResponse* response) { - const viam::app::v1::ComponentConfig& proto = request->config(); - ResourceConfig cfg = v2::from_proto(proto); - - const std::shared_ptr reg = - Registry::lookup_model(cfg.api(), cfg.model()); - if (!reg) { - return grpc::Status(grpc::UNKNOWN, - "unable to validate resource " + cfg.resource_name().name() + - " as it hasn't been registered."); - } - try { - const std::vector implicit_deps = reg->validate(cfg); - for (const auto& dep : implicit_deps) { - response->add_dependencies(dep); + try { + Stoppable::stop_if_stoppable(res); + } catch (const std::exception& err) { + BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what(); } - } catch (const std::exception& err) { - return grpc::Status(grpc::UNKNOWN, - "validation failure in resource " + cfg.name() + ": " + err.what()); - } - return grpc::Status(); -}; -::grpc::Status ModuleService::RemoveResource( - ::grpc::ServerContext*, - const ::viam::module::v1::RemoveResourceRequest* request, - ::viam::module::v1::RemoveResourceResponse*) { - auto name = Name::from_string(request->name()); - auto resource_server = server_->lookup_resource_server(name.api()); - if (!resource_server) { - return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string()); + manager->remove(name); + return grpc::Status(); } - const std::shared_ptr manager = resource_server->resource_manager(); - const std::shared_ptr res = manager->resource(name.name()); - if (!res) { - return grpc::Status( - grpc::UNKNOWN, - "unable to remove resource " + name.to_string() + " as it doesn't exist."); + + ::grpc::Status Ready(::grpc::ServerContext*, + const ::viam::module::v1::ReadyRequest* request, + ::viam::module::v1::ReadyResponse* response) override { + const std::lock_guard lock(parent.lock_); + const viam::module::v1::HandlerMap hm = parent.module_->handles().to_proto(); + *response->mutable_handlermap() = hm; + parent.parent_addr_ = request->parent_address(); + response->set_ready(parent.module_->ready()); + return grpc::Status(); } +}; - try { - Stoppable::stop_if_stoppable(res); - } catch (const std::exception& err) { - BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what(); +Dependencies ModuleService::get_dependencies_( + google::protobuf::RepeatedPtrField const& proto, + std::string const& resource_name) { + Dependencies deps; + for (const auto& dep : proto) { + auto dep_name = Name::from_string(dep); + const std::shared_ptr dep_resource = get_parent_resource_(dep_name); + if (!dep_resource) { + std::ostringstream buffer; + buffer << resource_name << ": Dependency " + << "`" << dep_name << "` was not found during (re)configuration"; + throw Exception(ErrorCondition::k_resource_not_found, buffer.str()); + } + deps.emplace(dep_name, dep_resource); } + return deps; +} - manager->remove(name); - return grpc::Status(); -}; +std::shared_ptr ModuleService::get_parent_resource_(const Name& name) { + if (!parent_) { + parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none}); + } -::grpc::Status ModuleService::Ready(::grpc::ServerContext*, - const ::viam::module::v1::ReadyRequest* request, - ::viam::module::v1::ReadyResponse* response) { - const std::lock_guard lock(lock_); - const viam::module::v1::HandlerMap hm = this->module_->handles().to_proto(); - *response->mutable_handlermap() = hm; - parent_addr_ = request->parent_address(); - response->set_ready(module_->ready()); - return grpc::Status(); -}; + return parent_->resource_by_name(name); +} ModuleService::ModuleService(std::string addr) - : module_(std::make_unique(std::move(addr))), server_(std::make_unique()) {} + : module_(std::make_unique(std::move(addr))), server_(std::make_unique()) { + impl_ = std::make_unique(*this); +} ModuleService::ModuleService(int argc, char** argv, - const std::vector>& registrations) { - if (argc < 2) { - throw Exception(ErrorCondition::k_connection, "Need socket path as command line argument"); - } - module_ = std::make_unique(argv[1]); - server_ = std::make_unique(); - signal_manager_ = SignalManager(); + const std::vector>& registrations) + : ModuleService([argc, argv] { + if (argc < 2) { + throw Exception(ErrorCondition::k_connection, + "Need socket path as command line argument"); + } + return argv[1]; + }()) { set_logger_severity_from_args(argc, argv); for (auto&& mr : registrations) { @@ -229,13 +237,27 @@ ModuleService::ModuleService(int argc, } } +ModuleService::~ModuleService() { + // TODO(RSDK-5509): Run registered cleanup functions here. + BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully."; + server_->shutdown(); + + if (parent_) { + try { + parent_->close(); + } catch (const std::exception& exc) { + BOOST_LOG_TRIVIAL(error) << exc.what(); + } + } +} + void ModuleService::serve() { const mode_t old_mask = umask(0077); const int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); listen(sockfd, 10); umask(old_mask); - server_->register_service(this); + server_->register_service(impl_.get()); const std::string address = "unix://" + module_->addr(); server_->add_listening_port(address); @@ -249,20 +271,6 @@ void ModuleService::serve() { signal_manager_.wait(); } -ModuleService::~ModuleService() { - // TODO(RSDK-5509): Run registered cleanup functions here. - BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully."; - server_->shutdown(); - - if (parent_) { - try { - parent_->close(); - } catch (const std::exception& exc) { - BOOST_LOG_TRIVIAL(error) << exc.what(); - } - } -} - void ModuleService::add_model_from_registry_inlock_(API api, Model model, const std::lock_guard&) { diff --git a/src/viam/sdk/module/service.hpp b/src/viam/sdk/module/service.hpp index 4925e279c..985c93e09 100644 --- a/src/viam/sdk/module/service.hpp +++ b/src/viam/sdk/module/service.hpp @@ -2,9 +2,6 @@ #include -#include -#include - #include #include #include @@ -21,7 +18,7 @@ namespace sdk { /// can construct a ModuleService and use its associated methods to write /// a working C++ module. See examples under `src/viam/examples/modules`. /// @ingroup Module -class ModuleService : viam::module::v1::ModuleService::Service { +class ModuleService { public: /// @brief Creates a new ModuleService that can serve on the provided socket. /// @param addr Address of socket to serve on. @@ -50,26 +47,8 @@ class ModuleService : viam::module::v1::ModuleService::Service { void add_model_from_registry(API api, Model model); private: - ::grpc::Status AddResource(::grpc::ServerContext* context, - const ::viam::module::v1::AddResourceRequest* request, - ::viam::module::v1::AddResourceResponse* response) override; - - ::grpc::Status ReconfigureResource( - ::grpc::ServerContext* context, - const ::viam::module::v1::ReconfigureResourceRequest* request, - ::viam::module::v1::ReconfigureResourceResponse* response) override; - - ::grpc::Status RemoveResource(::grpc::ServerContext* context, - const ::viam::module::v1::RemoveResourceRequest* request, - ::viam::module::v1::RemoveResourceResponse* response) override; - - ::grpc::Status Ready(::grpc::ServerContext* context, - const ::viam::module::v1::ReadyRequest* request, - ::viam::module::v1::ReadyResponse* response) override; - - ::grpc::Status ValidateConfig(::grpc::ServerContext* context, - const ::viam::module::v1::ValidateConfigRequest* request, - ::viam::module::v1::ValidateConfigResponse* response) override; + struct ServiceImpl; + friend ModuleService::ServiceImpl; void add_model_from_registry_inlock_(API api, Model model, const std::lock_guard&); Dependencies get_dependencies_(google::protobuf::RepeatedPtrField const& proto, @@ -82,6 +61,8 @@ class ModuleService : viam::module::v1::ModuleService::Service { std::string parent_addr_; std::unique_ptr server_; SignalManager signal_manager_; + + std::unique_ptr impl_; }; } // namespace sdk