diff --git a/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java b/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java index 59ee8795cc70cdef5751a23d2ceda4281ff8ae97..ed5ab415cd2f2c0cd1b53293ba0e585e25dd0926 100644 --- a/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java +++ b/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java @@ -1,7 +1,9 @@ package org.red5.client.test; import java.io.File; +import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,11 +15,13 @@ import org.red5.io.ITag; import org.red5.io.ITagReader; import org.red5.io.flv.impl.FLVReader; import org.red5.io.utils.ObjectMap; +import org.red5.server.api.IConnection; import org.red5.server.api.event.IEvent; import org.red5.server.api.event.IEventDispatcher; import org.red5.server.api.service.IPendingServiceCall; import org.red5.server.api.service.IPendingServiceCallback; import org.red5.server.api.service.IServiceCall; +import org.red5.server.messaging.IMessage; import org.red5.server.net.rtmp.event.AudioData; import org.red5.server.net.rtmp.event.IRTMPEvent; import org.red5.server.net.rtmp.event.Invoke; @@ -40,7 +44,13 @@ public class PublisherConnectLoadTest { private static Logger log = LoggerFactory.getLogger(PublisherConnectLoadTest.class); - private ExecutorService executor; + private static ExecutorService executor = Executors.newCachedThreadPool(); + + public static int publishers = 100; + + private static CountDownLatch latch = new CountDownLatch(publishers); + + private static CopyOnWriteArrayList<RTMPClient> publisherList = new CopyOnWriteArrayList<>(); private ITagReader reader; @@ -52,48 +62,41 @@ public class PublisherConnectLoadTest { private String app = "live"; - public static int publishers = 100; - - private static CountDownLatch latch = new CountDownLatch(publishers); - static { System.setProperty("red5.deployment.type", "junit"); } public void setUp() throws Exception { - executor = Executors.newCachedThreadPool(); reader = new FLVReader(new File("/media/mondain/terrorbyte/Videos", "BladeRunner2049.flv")); - executor.submit(() -> { - while (reader.hasMoreTags()) { - ITag tag = reader.readTag(); - if (tag != null) { - IRTMPEvent msg; - switch (tag.getDataType()) { - case Constants.TYPE_AUDIO_DATA: - msg = new AudioData(tag.getBody()); - break; - case Constants.TYPE_VIDEO_DATA: - msg = new VideoData(tag.getBody()); - break; - case Constants.TYPE_INVOKE: - msg = new Invoke(tag.getBody()); - break; - case Constants.TYPE_NOTIFY: - msg = new Notify(tag.getBody()); - break; - default: - log.warn("Unexpected type? {}", tag.getDataType()); - msg = new Unknown(tag.getDataType(), tag.getBody()); - break; - } - msg.setTimestamp(tag.getTimestamp()); - que.add(RTMPMessage.build(msg)); - } else { - break; + while (reader.hasMoreTags()) { + ITag tag = reader.readTag(); + if (tag != null) { + IRTMPEvent msg; + switch (tag.getDataType()) { + case Constants.TYPE_AUDIO_DATA: + msg = new AudioData(tag.getBody()); + break; + case Constants.TYPE_VIDEO_DATA: + msg = new VideoData(tag.getBody()); + break; + case Constants.TYPE_INVOKE: + msg = new Invoke(tag.getBody()); + break; + case Constants.TYPE_NOTIFY: + msg = new Notify(tag.getBody()); + break; + default: + log.warn("Unexpected type? {}", tag.getDataType()); + msg = new Unknown(tag.getDataType(), tag.getBody()); + break; } + msg.setTimestamp(tag.getTimestamp()); + que.add(RTMPMessage.build(msg)); + } else { + break; } - log.info("Queue fill completed: {}", que.size()); - }); + } + log.info("Queue fill completed: {}", que.size()); } public void testLivePublish() throws InterruptedException { @@ -101,11 +104,12 @@ public class PublisherConnectLoadTest { log.info("Publisher load test: {}", publishName); final RTMPClient client = new RTMPClient(); client.setConnectionClosedHandler(() -> { - log.info("Test: {} - exit", publishName); + log.info("Connection closed: {}", publishName); }); client.setExceptionHandler(new ClientExceptionHandler() { @Override public void handleException(Throwable throwable) { + log.info("Exception caught: {}", publishName); throwable.printStackTrace(); disconnect(client); } @@ -113,13 +117,13 @@ public class PublisherConnectLoadTest { client.setStreamEventDispatcher(new IEventDispatcher() { @Override public void dispatchEvent(IEvent event) { - log.info("ClientStream: {} dispachEvent: {}", publishName, event); + log.info("Client: {} dispach event: {}", publishName, event); } }); final INetStreamEventHandler handler = new INetStreamEventHandler() { @Override public void onStreamEvent(Notify notify) { - log.info("ClientStream: {} onStreamEvent: {}", publishName, notify); + log.info("Client: {} onStreamEvent: {}", publishName, notify); IServiceCall call = notify.getCall(); if ("onStatus".equals(call.getServiceMethodName())) { @SuppressWarnings("rawtypes") @@ -130,6 +134,7 @@ public class PublisherConnectLoadTest { log.info("Publish success: {}", publishName); break; case "NetStream.UnPublish.Success": + log.info("Unpublish success: {}", publishName); case "NetStream.Publish.Failed": disconnect(client); break; @@ -137,45 +142,55 @@ public class PublisherConnectLoadTest { } } }; - executor.submit(() -> { - // set the handler - client.setStreamEventHandler(handler); - // connect - client.connect(host, port, app, new IPendingServiceCallback() { - @Override - public void resultReceived(IPendingServiceCall call) { - ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult(); - String code = (String) map.get("code"); - log.info("Response code: {} for {}", code, publishName); - if ("NetConnection.Connect.Rejected".equals(code)) { - log.warn("Rejected: {}", map.get("description")); - disconnect(client); - } else if ("NetConnection.Connect.Success".equals(code)) { - client.createStream(new IPendingServiceCallback() { - @Override - public void resultReceived(IPendingServiceCall call) { - double streamId = (Double) call.getResult(); - // live buffer 0.5s - client.publish(streamId, publishName, "live", handler); + // set the handler + client.setStreamEventHandler(handler); + // connect + client.connect(host, port, app, new IPendingServiceCallback() { + @Override + public void resultReceived(IPendingServiceCall call) { + ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult(); + String code = (String) map.get("code"); + log.info("Response code: {} for {}", code, publishName); + if ("NetConnection.Connect.Rejected".equals(code)) { + log.warn("Rejected: {} detail: {}", publishName, map.get("description")); + disconnect(client); + } else if ("NetConnection.Connect.Success".equals(code)) { + client.createStream(new IPendingServiceCallback() { + @Override + public void resultReceived(IPendingServiceCall call) { + double streamId = (Double) call.getResult(); + // live buffer 0.5s + client.publish(streamId, publishName, "live", handler); + // add to list + publisherList.add(client); + // publishing thread + executor.submit(() -> { + // get the underlying connection + IConnection conn = client.getConnection(); // publish stream data - que.forEach(message -> { + for (IMessage message : que) { if (message != null) { log.info("Publishing: {}", message); client.publishStreamData(streamId, message); } - }); + if (!conn.isConnected()) { + log.warn("Connection closed for: {} while publishing", publishName); + break; + } + } client.unpublish(streamId); disconnect(client); - } - }); - } + log.info("publish - end: {}", publishName); + }); + } + }); } - }); - log.info("test - end: {}", publishName); + } }); } public static void disconnect(RTMPClient client) { + log.info("Disconnecting: {}", client); client.disconnect(); latch.countDown(); } @@ -187,13 +202,27 @@ public class PublisherConnectLoadTest { } public static void main(String[] args) { + Random rnd = new Random(); PublisherConnectLoadTest test = new PublisherConnectLoadTest(); try { // set up test.setUp(); // launch publishers for (int i = 0; i < publishers; i++) { + // launch a publisher test test.testLivePublish(); + // our current publisher count of those with publish-success + int publisherCount = publisherList.size(); + log.info("Publisher count: {} index: {}", publisherCount, i); + // for every few publishers, kill one off randomly + if (i % 3 == 0 && publisherCount > 10) { + int index = rnd.nextInt(publisherCount); + log.info("Killing publisher at index: {} of {}", index, publisherCount); + RTMPClient client = publisherList.remove(index); + if (client != null) { + disconnect(client); + } + } } // wait for all to finish latch.await(); diff --git a/common/src/main/java/org/red5/server/scope/Scope.java b/common/src/main/java/org/red5/server/scope/Scope.java index f60b7a8da19ea9a4f85bbbddd75144bdfedec2d6..891e6bfec58c7f90a2a071bb7593db8780572310 100644 --- a/common/src/main/java/org/red5/server/scope/Scope.java +++ b/common/src/main/java/org/red5/server/scope/Scope.java @@ -79,6 +79,8 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope protected static Logger log = LoggerFactory.getLogger(Scope.class); + protected static boolean isDebug = log.isDebugEnabled(), isTrace = log.isTraceEnabled(); + /** * Unset flag constant */ @@ -1236,7 +1238,7 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope //for debugging public void dump() { - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Scope: {} {}", this.getClass().getName(), this); log.trace("Running: {}", running); if (hasParent()) { @@ -1434,11 +1436,11 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope * @return true if a matching scope is found, false otherwise */ public boolean hasName(String name) { - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("hasName: {}", name); } if (name != null) { - return stream().filter(child -> name.equals(child.getName())).findFirst().isPresent(); + return stream().filter(child -> child.getName().equals(name)).findFirst().isPresent(); } else { log.info("Invalid scope name, null is not allowed"); } @@ -1469,7 +1471,7 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope Optional<IBasicScope> scope = null; // skip type check? if (ScopeType.UNDEFINED.equals(type)) { - scope = stream().filter(child -> name.equals(child.getName())).findFirst(); + scope = stream().filter(child -> child.getName().equals(name)).findFirst(); } else { // if its broadcast type then allow an alias match in addition to the name match if (ScopeType.BROADCAST.equals(type)) { @@ -1477,10 +1479,11 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope for (IBasicScope child : this) { // ensure type is broadcast type, since we'll pull out a cbs if (child.getType().equals(type)) { + String childName = child.getName(); IClientBroadcastStream cbs = ((IBroadcastScope) child).getClientBroadcastStream(); if (cbs != null) { String pubName = cbs.getPublishedName(); - if (name.equals(child.getName())) { + if (childName.equals(name)) { log.debug("Scope found by name: {} on {}", name, pubName); return child; } else if (cbs.containsAlias(name)) { @@ -1491,7 +1494,7 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope } } else { //log.debug("Broadcast scope: {} has no stream attached", name); - if (name.equals(child.getName())) { + if (childName.equals(name)) { log.debug("Scope found by name: {} but has no stream", name); return child; } @@ -1499,7 +1502,7 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope } } } else { - scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst(); + scope = stream().filter(child -> child.getType().equals(type) && child.getName().equals(name)).findFirst(); } } if (scope != null && scope.isPresent()) { diff --git a/common/src/main/java/org/red5/server/stream/ClientBroadcastStream.java b/common/src/main/java/org/red5/server/stream/ClientBroadcastStream.java index a080f81017e16b96059d2378434d2b4509afcca9..0cd5f4921cecfddc220dd9c540e1bfd59e6ca05f 100644 --- a/common/src/main/java/org/red5/server/stream/ClientBroadcastStream.java +++ b/common/src/main/java/org/red5/server/stream/ClientBroadcastStream.java @@ -177,7 +177,7 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie /** * Whether or not to register with JMX. */ - protected boolean registerJMX = true; + protected boolean registerJMX; /** * Stream name aliases for the entire server instance. @@ -968,7 +968,7 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie } protected void registerJMX() { - if (registerJMX) { + if (registerJMX && StringUtils.isNotEmpty(publishedName) && !"false".equals(publishedName)) { // register with jmx MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { diff --git a/common/src/main/java/org/red5/server/stream/StreamService.java b/common/src/main/java/org/red5/server/stream/StreamService.java index 610635dee2d24a3bf1ebe768310c4dd2eff7c5bf..a345d016903841bc972959261e9411027f37e6a1 100644 --- a/common/src/main/java/org/red5/server/stream/StreamService.java +++ b/common/src/main/java/org/red5/server/stream/StreamService.java @@ -849,7 +849,10 @@ public class StreamService implements IStreamService { * @return Broadcast scope */ public IBroadcastScope getBroadcastScope(IScope scope, String name) { - return scope.getBroadcastScope(name); + if (scope != null) { + return scope.getBroadcastScope(name); + } + return null; } /**