Skip to content

Commit

Permalink
Merge branch 'refs/heads/apache-3.3' into 3.3.0-beta.5-release
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Aug 5, 2024
2 parents 7dd25ec + a64202c commit 29ebef8
Show file tree
Hide file tree
Showing 26 changed files with 354 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface DataStore {
void put(String componentName, String key, Object value);

void remove(String componentName, String key);

default void addListener(DataStoreUpdateListener dataStoreUpdateListener) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.store;

public interface DataStoreUpdateListener {
void onUpdate(String componentName, String key, Object value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
*/
package org.apache.dubbo.common.store.support;

import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.store.DataStoreUpdateListener;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class SimpleDataStore implements DataStore {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(SimpleDataStore.class);

// <component name or id, <data-name, data-value>>
private final ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap<>();
private final ConcurrentHashSet<DataStoreUpdateListener> listeners = new ConcurrentHashSet<>();

@Override
public Map<String, Object> get(String componentName) {
Expand All @@ -52,6 +59,7 @@ public void put(String componentName, String key, Object value) {
Map<String, Object> componentData =
ConcurrentHashMapUtils.computeIfAbsent(data, componentName, k -> new ConcurrentHashMap<>());
componentData.put(key, value);
notifyListeners(componentName, key, value);
}

@Override
Expand All @@ -60,5 +68,27 @@ public void remove(String componentName, String key) {
return;
}
data.get(componentName).remove(key);
notifyListeners(componentName, key, null);
}

@Override
public void addListener(DataStoreUpdateListener dataStoreUpdateListener) {
listeners.add(dataStoreUpdateListener);
}

private void notifyListeners(String componentName, String key, Object value) {
for (DataStoreUpdateListener listener : listeners) {
try {
listener.onUpdate(componentName, key, value);
} catch (Throwable t) {
logger.warn(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Failed to notify data store update listener. " + "ComponentName: " + componentName + " Key: "
+ key,
t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,9 @@ private void dumpJStack() {
jstack(jStackStream);
} catch (Exception t) {
logger.error(COMMON_UNEXPECTED_CREATE_DUMP, "", "", "dump jStack error", t);
} finally {
lastPrintTime = System.currentTimeMillis();
}
});
lastPrintTime = System.currentTimeMillis();
} finally {
guard.release();
// must shut down thread pool ,if not will lead to OOM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,12 @@ public interface Constants {

String SERVER_THREAD_POOL_NAME = "DubboServerHandler";

String SERVER_THREAD_POOL_PREFIX = SERVER_THREAD_POOL_NAME + "-";

String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";

String CLIENT_THREAD_POOL_PREFIX = CLIENT_THREAD_POOL_NAME + "-";

String REST_PROTOCOL = "rest";

String DEFAULT_NATIVE_COMPILER = "jdk";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package org.apache.dubbo.common.store.support;

import org.apache.dubbo.common.store.DataStoreUpdateListener;

import java.util.Map;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -57,4 +61,30 @@ void testGetComponent() throws Exception {
dataStore.remove("component", "key");
assertNotEquals(map, dataStore.get("component"));
}

@Test
void testNotify() {
DataStoreUpdateListener listener = Mockito.mock(DataStoreUpdateListener.class);
dataStore.addListener(listener);

ArgumentCaptor<String> componentNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Object> valueCaptor = ArgumentCaptor.forClass(Object.class);

dataStore.put("name", "key", "1");
Mockito.verify(listener).onUpdate(componentNameCaptor.capture(), keyCaptor.capture(), valueCaptor.capture());
assertEquals("name", componentNameCaptor.getValue());
assertEquals("key", keyCaptor.getValue());
assertEquals("1", valueCaptor.getValue());

dataStore.remove("name", "key");
Mockito.verify(listener, Mockito.times(2))
.onUpdate(componentNameCaptor.capture(), keyCaptor.capture(), valueCaptor.capture());
assertEquals("name", componentNameCaptor.getValue());
assertEquals("key", keyCaptor.getValue());
assertNull(valueCaptor.getValue());

dataStore.remove("name2", "key");
Mockito.verify(listener, Mockito.times(0)).onUpdate("name2", "key", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
package org.apache.dubbo.common.threadpool.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedEvent;
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedListener;
import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;

import java.io.FileOutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.apache.dubbo.common.constants.CommonConstants.OS_WIN_PREFIX;
Expand All @@ -39,6 +47,11 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

class AbortPolicyWithReportTest {
@BeforeEach
public void setUp() {
AbortPolicyWithReport.lastPrintTime = 0;
}

@Test
void jStackDumpTest() {
URL url = URL.valueOf(
Expand All @@ -62,6 +75,61 @@ protected void jstack(FileOutputStream jStackStream) {
Assertions.assertNotNull(fileOutputStream.get());
}

@Test
void jStack_ConcurrencyDump_Silence_10Min() {
URL url = URL.valueOf(
"dubbo://admin:[email protected]:20880/context/path?dump.directory=/tmp&version=1.0.0&application=morgan&noValue=");
AtomicInteger jStackCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);
AtomicInteger finishedCount = new AtomicInteger(0);
AtomicInteger timeoutCount = new AtomicInteger(0);
AbortPolicyWithReport abortPolicyWithReport = new AbortPolicyWithReport("Test", url) {
@Override
protected void jstack(FileOutputStream jStackStream) {
jStackCount.incrementAndGet();
// try to simulate the jstack cost long time, so that AbortPolicyWithReport may jstack repeatedly.
long startTime = System.currentTimeMillis();
await().atLeast(200, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis() - startTime >= 300);
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4,
4,
0,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new NamedInternalThreadFactory("jStack_ConcurrencyDump_Silence_10Min", false),
abortPolicyWithReport);
int runTimes = 100;
List<Future<?>> futureList = new LinkedList<>();
for (int i = 0; i < runTimes; i++) {
try {
futureList.add(threadPoolExecutor.submit(() -> {
finishedCount.incrementAndGet();
long start = System.currentTimeMillis();
// try to await 1s to make sure jstack dump thread scheduled
await().atLeast(300, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis() - start >= 300);
}));
} catch (Exception ignored) {
failureCount.incrementAndGet();
}
}
futureList.forEach(f -> {
try {
f.get(500, TimeUnit.MILLISECONDS);
} catch (Exception ignored) {
timeoutCount.incrementAndGet();
}
});

System.out.printf(
"jStackCount: %d, finishedCount: %d, failureCount: %d, timeoutCount: %d %n",
jStackCount.get(), finishedCount.get(), failureCount.get(), timeoutCount.get());
Assertions.assertEquals(
runTimes, finishedCount.get() + failureCount.get(), "all the test thread should be run completely");
Assertions.assertEquals(1, jStackCount.get(), "'jstack' should be called only once in 10 minutes");
}

@Test
void jStackDumpTest_dumpDirectoryNotExists_cannotBeCreatedTakeUserHome() {
final String dumpDirectory = dumpDirectoryCannotBeCreated();
Expand Down
2 changes: 1 addition & 1 deletion dubbo-config/dubbo-config-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.8</version>
<version>1.20.0</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class ReferenceBean<T>
private MutablePropertyValues propertyValues;

// actual reference config
private ReferenceConfig referenceConfig;
private volatile ReferenceConfig referenceConfig;

// ReferenceBeanManager
private ReferenceBeanManager referenceBeanManager;
Expand Down Expand Up @@ -437,15 +437,22 @@ private void generateFromJdk(List<Class<?>> interfaces) {

private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
referenceBeanManager.initReferenceBean(this);
applicationContext
.getBean(DubboConfigApplicationListener.class.getName(), DubboConfigApplicationListener.class)
.init();
logger.warn(
CONFIG_DUBBO_BEAN_INITIALIZER,
"",
"",
"ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
synchronized (LockUtils.getSingletonMutex(applicationContext)) {
if (referenceConfig == null) {
referenceBeanManager.initReferenceBean(this);
applicationContext
.getBean(
DubboConfigApplicationListener.class.getName(),
DubboConfigApplicationListener.class)
.init();
logger.warn(
CONFIG_DUBBO_BEAN_INITIALIZER,
"",
"",
"ReferenceBean is not ready yet, please make sure to "
+ "call reference interface method after dubbo is started.");
}
}
}
// get reference proxy
// Subclasses should synchronize on the given Object if they perform any sort of extended singleton creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onApplicationEvent(DubboConfigInitEvent event) {
}
}

public void init() {
public synchronized void init() {
// It's expected to be notified at
// org.springframework.context.support.AbstractApplicationContext.registerListeners(),
// before loading non-lazy singleton beans. At this moment, all BeanFactoryPostProcessor have been processed,
Expand Down
2 changes: 1 addition & 1 deletion dubbo-demo/dubbo-demo-spring-boot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<skip_maven_deploy>true</skip_maven_deploy>
<spring-boot.version>2.7.18</spring-boot.version>
<spring-boot-maven-plugin.version>2.7.18</spring-boot-maven-plugin.version>
<micrometer-core.version>1.13.1</micrometer-core.version>
<micrometer-core.version>1.13.2</micrometer-core.version>
</properties>

<dependencyManagement>
Expand Down
Loading

0 comments on commit 29ebef8

Please sign in to comment.