diff --git a/client/src/main/java/org/red5/client/net/rtmp/BaseRTMPClientHandler.java b/client/src/main/java/org/red5/client/net/rtmp/BaseRTMPClientHandler.java index cc0ca9e61c66dc22d3a61d0fc8dca90a682ac03c..3ab7436aa452c4e57a10540b4371a27a76ac6805 100644 --- a/client/src/main/java/org/red5/client/net/rtmp/BaseRTMPClientHandler.java +++ b/client/src/main/java/org/red5/client/net/rtmp/BaseRTMPClientHandler.java @@ -11,6 +11,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -97,7 +98,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I /** * Net stream handling */ - private volatile ConcurrentMap<Number, NetStreamPrivateData> streamDataMap = new ConcurrentHashMap<>(3, 0.75f, 1); + private volatile CopyOnWriteArrayList<NetStreamPrivateData> streamDataList = new CopyOnWriteArrayList<>(); /** * Task to start on connection close @@ -541,7 +542,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I public void disconnect() { log.debug("disconnect: {}", conn); if (conn != null) { - streamDataMap.clear(); + streamDataList.clear(); conn.close(); } } @@ -573,10 +574,10 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I @Override public void publish(Number streamId, String name, String mode, INetStreamEventHandler handler) { - log.debug("publish - stream id: {}, name: {}, mode: {}", new Object[] { streamId, name, mode }); + log.debug("publish - stream id: {}, name: {}, mode: {}", streamId, name, mode); // setup the netstream handler if (handler != null) { - NetStreamPrivateData streamData = streamDataMap.get(streamId); + NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId.doubleValue())).findFirst().orElse(null); if (streamData != null) { log.debug("Setting handler on stream data - handler: {}", handler); streamData.handler = handler; @@ -597,13 +598,17 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I public void unpublish(Number streamId) { log.debug("unpublish stream {}", streamId); PendingCall pendingCall = new PendingCall("publish", new Object[] { false }); + // this cannot handle a null streamId, so force to 1.0 if null + if (streamId == null) { + streamId = 1.0; + } conn.invoke(pendingCall, getChannelForStreamId(streamId)); } @Override public void publishStreamData(Number streamId, IMessage message) { - NetStreamPrivateData streamData = streamDataMap.get(streamId); - log.debug("publishStreamData - stream data map: {}", streamDataMap); + // get the stream data by index of the list + NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId.doubleValue())).findFirst().orElse(null); if (streamData != null) { if (streamData.connConsumer != null) { streamData.connConsumer.pushMessage(null, message); @@ -611,7 +616,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I log.warn("Connection consumer was not found for stream id: {}", streamId); } } else { - log.warn("Stream data not found for stream id: {}", streamId); + log.warn("Stream data not found for stream id: {} in {}", streamId, streamDataList); } } @@ -793,7 +798,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I boolean onStatus = "onStatus".equals(methodName); if (onStatus) { log.debug("onStatus"); - Number streamId = source.getStreamId(); + Number streamId = (Number) (source.getStreamId() != null ? source.getStreamId().doubleValue() : 1.0d); if (log.isDebugEnabled()) { log.debug("Stream id from header: {}", streamId); // XXX create better to serialize ObjectMap to Status object @@ -802,26 +807,11 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I log.debug("Client id from status: {}", objMap.get("clientid")); } if (streamId != null) { - // try lookup by stream id - NetStreamPrivateData streamData = streamDataMap.get(streamId); - // if null return the first one in the map - if (streamData == null) { - if (!streamDataMap.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("Stream data was not found by id. Map contents: {}", streamDataMap); - } - if (streamId instanceof Integer) { - streamData = streamDataMap.get(streamId.intValue() * 1.0d); - } else { - // just pass back the first item - streamData = streamDataMap.values().iterator().next(); - } - } - } + // try lookup by stream id, if null return the first one + NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId)).findFirst().orElse(streamDataList.get(0)); if (streamData == null) { log.warn("Stream data was null for id: {}", streamId); - } - if (streamData != null && streamData.handler != null) { + } else if (streamData.handler != null) { log.debug("Got stream data and handler"); streamData.handler.onStreamEvent((Notify) command); } @@ -1036,40 +1026,41 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I public void resultReceived(IPendingServiceCall call) { // get the result as base object Object callResult = call.getResult(); - // we expect a number consisting of the stream id, but we'll check for an object map as well - Number streamId = null; - if (callResult instanceof Number) { - streamId = (Number) callResult; - } else if (callResult instanceof Map) { - Map<?, ?> map = (Map<?, ?>) callResult; - // XXX(paul) log out the map contents - log.warn("CreateStreamCallBack resultReceived - map: {}", map); - if (map.containsKey("streamId")) { - Object tmpStreamId = map.get("streamId"); - if (tmpStreamId instanceof Number) { - streamId = (Number) tmpStreamId; - } else { - log.warn("CreateStreamCallBack resultReceived - stream id is not a number: {}", tmpStreamId); + if (callResult != null) { + // we expect a number consisting of the stream id, but we'll check for an object map as well + Number streamId = null; + if (callResult instanceof Number) { + streamId = (Number) callResult; + } else if (callResult instanceof Map) { + Map<?, ?> map = (Map<?, ?>) callResult; + // XXX(paul) log out the map contents + log.warn("CreateStreamCallBack resultReceived - map: {}", map); + if (map.containsKey("streamId")) { + Object tmpStreamId = map.get("streamId"); + if (tmpStreamId instanceof Number) { + streamId = (Number) tmpStreamId; + } else { + log.warn("CreateStreamCallBack resultReceived - stream id is not a number: {}", tmpStreamId); + } } } - } else if (callResult == null) { + log.debug("CreateStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn); + if (conn != null && streamId != null) { + log.debug("Setting new net stream"); + NetStream stream = new NetStream(streamEventDispatcher); + stream.setConnection(conn); + stream.setStreamId(streamId); + conn.addClientStream(stream); + NetStreamPrivateData streamData = new NetStreamPrivateData(streamId); + streamData.outputStream = conn.createOutputStream(streamId); + streamData.connConsumer = new ConnectionConsumer(conn, streamData.outputStream.getVideo(), streamData.outputStream.getAudio(), streamData.outputStream.getData()); + streamDataList.add(streamData); + log.debug("streamDataList: {}", streamDataList); + } + wrapped.resultReceived(call); + } else { log.warn("CreateStreamCallBack resultReceived - call result is null"); - return; - } - log.debug("CreateStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn); - if (conn != null && streamId != null) { - log.debug("Setting new net stream"); - NetStream stream = new NetStream(streamEventDispatcher); - stream.setConnection(conn); - stream.setStreamId(streamId); - conn.addClientStream(stream); - NetStreamPrivateData streamData = new NetStreamPrivateData(); - streamData.outputStream = conn.createOutputStream(streamId); - streamData.connConsumer = new ConnectionConsumer(conn, streamData.outputStream.getVideo(), streamData.outputStream.getAudio(), streamData.outputStream.getData()); - streamDataMap.put(streamId, streamData); - log.debug("streamDataMap: {}", streamDataMap); } - wrapped.resultReceived(call); } } @@ -1101,34 +1092,25 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I public void resultReceived(IPendingServiceCall call) { // get the result as base object Object callResult = call.getResult(); - // we expect a number consisting of the stream id, but we'll check for an object map as well - Number streamId = null; - if (callResult instanceof Number) { - streamId = (Number) callResult; - } else if (callResult instanceof Map) { - Map<?, ?> map = (Map<?, ?>) callResult; - if (map.containsKey("streamId")) { - Object tmpStreamId = map.get("streamId"); - if (tmpStreamId instanceof Number) { - streamId = (Number) tmpStreamId; + if (callResult != null) { + // we expect a number consisting of the stream id, but we'll check for an object map as well + final Number streamId = (Number) (callResult instanceof Number ? callResult : (callResult instanceof Map ? ((Map<?, ?>) callResult).get("streamId") : 1.0)); + log.debug("DeleteStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn); + if (conn != null) { + log.debug("Deleting net stream"); + conn.removeClientStream(streamId); + // send a delete notify? + NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId)).findFirst().orElse(null); + if (streamData != null) { + streamDataList.remove(streamData); } else { - log.warn("DeleteStreamCallBack resultReceived - stream id is not a number: {}", tmpStreamId); + log.warn("Stream data not found for stream id: {}", streamId); } } - } else if (callResult == null) { + wrapped.resultReceived(call); + } else { log.warn("DeleteStreamCallBack resultReceived - call result is null"); - return; - } - log.debug("DeleteStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn); - if (conn != null && streamId != null) { - log.debug("Deleting net stream"); - conn.removeClientStream(streamId); - // send a delete notify? - //NetStreamPrivateData streamData = streamDataMap.get(streamId); - //streamData.handler.onStreamEvent(notify) - streamDataMap.remove(streamId); } - wrapped.resultReceived(call); } } @@ -1166,12 +1148,24 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I public volatile ConnectionConsumer connConsumer; - { + final Number streamId; + + NetStreamPrivateData(Number streamId) { + this.streamId = streamId; if (streamEventHandler != null) { handler = streamEventHandler; } } + public Number getStreamId() { + return streamId; + } + + @Override + public int hashCode() { + return streamId.hashCode(); + } + } } diff --git a/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java b/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java index ed5ab415cd2f2c0cd1b53293ba0e585e25dd0926..fdedaa50a32c68ba3c9e02e72473fb209cee0e92 100644 --- a/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java +++ b/client/src/main/java/org/red5/client/test/PublisherConnectLoadTest.java @@ -7,6 +7,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.red5.client.net.rtmp.ClientExceptionHandler; import org.red5.client.net.rtmp.INetStreamEventHandler; @@ -15,13 +17,12 @@ import org.red5.io.ITag; import org.red5.io.ITagReader; import org.red5.io.flv.impl.FLVReader; import org.red5.io.utils.ObjectMap; -import org.red5.server.api.IConnection; import org.red5.server.api.event.IEvent; import org.red5.server.api.event.IEventDispatcher; import org.red5.server.api.service.IPendingServiceCall; import org.red5.server.api.service.IPendingServiceCallback; import org.red5.server.api.service.IServiceCall; -import org.red5.server.messaging.IMessage; +import org.red5.server.net.rtmp.RTMPConnection; import org.red5.server.net.rtmp.event.AudioData; import org.red5.server.net.rtmp.event.IRTMPEvent; import org.red5.server.net.rtmp.event.Invoke; @@ -44,14 +45,21 @@ public class PublisherConnectLoadTest { private static Logger log = LoggerFactory.getLogger(PublisherConnectLoadTest.class); + private static Random rnd = new Random(); + private static ExecutorService executor = Executors.newCachedThreadPool(); - public static int publishers = 100; + public static int publishers = 30; private static CountDownLatch latch = new CountDownLatch(publishers); private static CopyOnWriteArrayList<RTMPClient> publisherList = new CopyOnWriteArrayList<>(); + private static AtomicIntegerFieldUpdater<PublisherConnectLoadTest> AtomicPublishCounter = AtomicIntegerFieldUpdater.newUpdater(PublisherConnectLoadTest.class, "publishCount"); + + // updated atomically as a counter since the publish list is weakly consistent in terms of size + private volatile int publishCount; + private ITagReader reader; private ConcurrentLinkedQueue<RTMPMessage> que = new ConcurrentLinkedQueue<>(); @@ -99,8 +107,8 @@ public class PublisherConnectLoadTest { log.info("Queue fill completed: {}", que.size()); } - public void testLivePublish() throws InterruptedException { - final String publishName = String.format("stream%d", System.nanoTime()); + public void testLivePublish(int i) throws InterruptedException { + final String publishName = String.format("stream%d", i); log.info("Publisher load test: {}", publishName); final RTMPClient client = new RTMPClient(); client.setConnectionClosedHandler(() -> { @@ -130,8 +138,12 @@ public class PublisherConnectLoadTest { ObjectMap status = ((ObjectMap) call.getArguments()[0]); String code = (String) status.get("code"); switch (code) { - case "NetStream.Publish.Success": + case "NetStream.Publish.Start": log.info("Publish success: {}", publishName); + // do publishing + startPublish(client, publishName); + // randomly decide if a publisher should be killed + maybeKillPublisher(); break; case "NetStream.UnPublish.Success": log.info("Unpublish success: {}", publishName); @@ -141,56 +153,105 @@ public class PublisherConnectLoadTest { } } } + }; // set the handler client.setStreamEventHandler(handler); // connect - client.connect(host, port, app, new IPendingServiceCallback() { - @Override - public void resultReceived(IPendingServiceCall call) { - ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult(); - String code = (String) map.get("code"); - log.info("Response code: {} for {}", code, publishName); - if ("NetConnection.Connect.Rejected".equals(code)) { - log.warn("Rejected: {} detail: {}", publishName, map.get("description")); - disconnect(client); - } else if ("NetConnection.Connect.Success".equals(code)) { - client.createStream(new IPendingServiceCallback() { - @Override - public void resultReceived(IPendingServiceCall call) { - double streamId = (Double) call.getResult(); - // live buffer 0.5s - client.publish(streamId, publishName, "live", handler); - // add to list - publisherList.add(client); - // publishing thread - executor.submit(() -> { - // get the underlying connection - IConnection conn = client.getConnection(); - // publish stream data - for (IMessage message : que) { - if (message != null) { - log.info("Publishing: {}", message); - client.publishStreamData(streamId, message); - } - if (!conn.isConnected()) { - log.warn("Connection closed for: {} while publishing", publishName); - break; - } - } - client.unpublish(streamId); - disconnect(client); - log.info("publish - end: {}", publishName); - }); - } - }); + executor.submit(() -> { + client.connect(host, port, app, new IPendingServiceCallback() { + @Override + public void resultReceived(IPendingServiceCall call) { + ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult(); + String code = (String) map.get("code"); + log.info("Response code: {} for {}", code, publishName); + if ("NetConnection.Connect.Rejected".equals(code)) { + log.warn("Rejected: {} detail: {}", publishName, map.get("description")); + disconnect(client); + } else if ("NetConnection.Connect.Success".equals(code)) { + client.createStream(new IPendingServiceCallback() { + @Override + public void resultReceived(IPendingServiceCall call) { + Number streamId = (Number) call.getResult(); + log.info("Create for publish: {} with stream id: {}", publishName, streamId); + client.publish(streamId, publishName, "live", handler); + } + }); + } } - } + }); + }); + } + + public void startPublish(RTMPClient client, String publishName) { + log.info("Start publish: {} name: {}", client, publishName); + // add to list + if (publisherList.add(client)) { + // increment the counter + AtomicPublishCounter.incrementAndGet(this); + } + // publishing thread + executor.submit(() -> { + // get the underlying connection + final RTMPConnection conn = client.getConnection(); + final Number streamId = conn.getStreamId() == null ? 1.0d : conn.getStreamId(); + log.info("Publishing: {} stream id: {}", publishName, streamId); + AtomicInteger messageCounter = new AtomicInteger(); + // publish stream data + que.spliterator().forEachRemaining(msg -> { + if (msg != null) { + log.trace("Publishing: {}", msg); + client.publishStreamData(streamId, msg); + messageCounter.incrementAndGet(); + } else { + log.warn("Null message for: {}", publishName); + } + try { + Thread.sleep(13L); + } catch (InterruptedException e) { + } + // TODO(paul) looking to why its always disconnected + /* + // check for disconnect + if (conn.isDisconnected()) { + log.warn("Connection is disconnected for: {} while publishing", publishName); + return; + } + */ + }); + // unpublish + client.unpublish(streamId); + disconnect(client); + log.info("Publishing completed: {} with {} messages published", publishName, messageCounter.get()); }); } - public static void disconnect(RTMPClient client) { + public void maybeKillPublisher() { + // our current publisher count of those with publish-success + log.info("Publisher count: {}", publishCount); + // for every few publishers, kill one off randomly + if (publishCount > (publishers / 3)) { + int index = rnd.nextInt(publishCount); + if (index % 3 == 0) { + log.info("Killing publisher at index: {} of {}", index, publishCount); + RTMPClient client = publisherList.get(index); + if (client != null) { + Number streamId = client.getConnection().getStreamId(); + log.info("Unpublishing: {} stream id: {}", client, streamId); + client.unpublish(streamId); + } + } + } + } + + public void disconnect(RTMPClient client) { log.info("Disconnecting: {}", client); + // ensure the client is removed from the list + if (publisherList.remove(client)) { + AtomicPublishCounter.decrementAndGet(this); + } else { + log.info("Publisher already removed or was not publishing: {}", client); + } client.disconnect(); latch.countDown(); } @@ -199,10 +260,14 @@ public class PublisherConnectLoadTest { reader.close(); que.clear(); ExecutorServiceUtil.shutdown(executor); + publisherList.clear(); + } + + public int getPublishCount() { + return publishCount; } public static void main(String[] args) { - Random rnd = new Random(); PublisherConnectLoadTest test = new PublisherConnectLoadTest(); try { // set up @@ -210,19 +275,7 @@ public class PublisherConnectLoadTest { // launch publishers for (int i = 0; i < publishers; i++) { // launch a publisher test - test.testLivePublish(); - // our current publisher count of those with publish-success - int publisherCount = publisherList.size(); - log.info("Publisher count: {} index: {}", publisherCount, i); - // for every few publishers, kill one off randomly - if (i % 3 == 0 && publisherCount > 10) { - int index = rnd.nextInt(publisherCount); - log.info("Killing publisher at index: {} of {}", index, publisherCount); - RTMPClient client = publisherList.remove(index); - if (client != null) { - disconnect(client); - } - } + test.testLivePublish(i); } // wait for all to finish latch.await(); @@ -230,6 +283,8 @@ public class PublisherConnectLoadTest { test.tearDown(); } catch (Exception e) { log.warn("Exception", e); + } finally { + log.info("Done"); } } diff --git a/client/src/test/java/org/red5/client/net/rtmp/FBLiveConnectTest.java b/client/src/test/java/org/red5/client/net/rtmp/FBLiveConnectTest.java index e49f5498afe4490bc386ca138912eface44c0de8..1c29b47d655c0d8ab6e461fce76d0abd3790ddbb 100644 --- a/client/src/test/java/org/red5/client/net/rtmp/FBLiveConnectTest.java +++ b/client/src/test/java/org/red5/client/net/rtmp/FBLiveConnectTest.java @@ -143,7 +143,7 @@ public class FBLiveConnectTest { ObjectMap status = ((ObjectMap) call.getArguments()[0]); String code = (String) status.get("code"); switch (code) { - case "NetStream.Publish.Success": + case "NetStream.Publish.Start": publishing = true; break; case "NetStream.UnPublish.Success":