Skip to content
Snippets Groups Projects
Commit 2d0baecf authored by Paul Gregoire's avatar Paul Gregoire
Browse files

Rework how RTMPClient private stream data is accessed by stream id

parent e2821baa
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -97,7 +98,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
/**
* Net stream handling
*/
private volatile ConcurrentMap<Number, NetStreamPrivateData> streamDataMap = new ConcurrentHashMap<>(3, 0.75f, 1);
private volatile CopyOnWriteArrayList<NetStreamPrivateData> streamDataList = new CopyOnWriteArrayList<>();
/**
* Task to start on connection close
......@@ -541,7 +542,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
public void disconnect() {
log.debug("disconnect: {}", conn);
if (conn != null) {
streamDataMap.clear();
streamDataList.clear();
conn.close();
}
}
......@@ -573,10 +574,10 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
@Override
public void publish(Number streamId, String name, String mode, INetStreamEventHandler handler) {
log.debug("publish - stream id: {}, name: {}, mode: {}", new Object[] { streamId, name, mode });
log.debug("publish - stream id: {}, name: {}, mode: {}", streamId, name, mode);
// setup the netstream handler
if (handler != null) {
NetStreamPrivateData streamData = streamDataMap.get(streamId);
NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId.doubleValue())).findFirst().orElse(null);
if (streamData != null) {
log.debug("Setting handler on stream data - handler: {}", handler);
streamData.handler = handler;
......@@ -597,13 +598,17 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
public void unpublish(Number streamId) {
log.debug("unpublish stream {}", streamId);
PendingCall pendingCall = new PendingCall("publish", new Object[] { false });
// this cannot handle a null streamId, so force to 1.0 if null
if (streamId == null) {
streamId = 1.0;
}
conn.invoke(pendingCall, getChannelForStreamId(streamId));
}
@Override
public void publishStreamData(Number streamId, IMessage message) {
NetStreamPrivateData streamData = streamDataMap.get(streamId);
log.debug("publishStreamData - stream data map: {}", streamDataMap);
// get the stream data by index of the list
NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId.doubleValue())).findFirst().orElse(null);
if (streamData != null) {
if (streamData.connConsumer != null) {
streamData.connConsumer.pushMessage(null, message);
......@@ -611,7 +616,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
log.warn("Connection consumer was not found for stream id: {}", streamId);
}
} else {
log.warn("Stream data not found for stream id: {}", streamId);
log.warn("Stream data not found for stream id: {} in {}", streamId, streamDataList);
}
}
......@@ -793,7 +798,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
boolean onStatus = "onStatus".equals(methodName);
if (onStatus) {
log.debug("onStatus");
Number streamId = source.getStreamId();
Number streamId = (Number) (source.getStreamId() != null ? source.getStreamId().doubleValue() : 1.0d);
if (log.isDebugEnabled()) {
log.debug("Stream id from header: {}", streamId);
// XXX create better to serialize ObjectMap to Status object
......@@ -802,26 +807,11 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
log.debug("Client id from status: {}", objMap.get("clientid"));
}
if (streamId != null) {
// try lookup by stream id
NetStreamPrivateData streamData = streamDataMap.get(streamId);
// if null return the first one in the map
if (streamData == null) {
if (!streamDataMap.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Stream data was not found by id. Map contents: {}", streamDataMap);
}
if (streamId instanceof Integer) {
streamData = streamDataMap.get(streamId.intValue() * 1.0d);
} else {
// just pass back the first item
streamData = streamDataMap.values().iterator().next();
}
}
}
// try lookup by stream id, if null return the first one
NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId)).findFirst().orElse(streamDataList.get(0));
if (streamData == null) {
log.warn("Stream data was null for id: {}", streamId);
}
if (streamData != null && streamData.handler != null) {
} else if (streamData.handler != null) {
log.debug("Got stream data and handler");
streamData.handler.onStreamEvent((Notify) command);
}
......@@ -1036,6 +1026,7 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
public void resultReceived(IPendingServiceCall call) {
// get the result as base object
Object callResult = call.getResult();
if (callResult != null) {
// we expect a number consisting of the stream id, but we'll check for an object map as well
Number streamId = null;
if (callResult instanceof Number) {
......@@ -1052,9 +1043,6 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
log.warn("CreateStreamCallBack resultReceived - stream id is not a number: {}", tmpStreamId);
}
}
} else if (callResult == null) {
log.warn("CreateStreamCallBack resultReceived - call result is null");
return;
}
log.debug("CreateStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn);
if (conn != null && streamId != null) {
......@@ -1063,13 +1051,16 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
stream.setConnection(conn);
stream.setStreamId(streamId);
conn.addClientStream(stream);
NetStreamPrivateData streamData = new NetStreamPrivateData();
NetStreamPrivateData streamData = new NetStreamPrivateData(streamId);
streamData.outputStream = conn.createOutputStream(streamId);
streamData.connConsumer = new ConnectionConsumer(conn, streamData.outputStream.getVideo(), streamData.outputStream.getAudio(), streamData.outputStream.getData());
streamDataMap.put(streamId, streamData);
log.debug("streamDataMap: {}", streamDataMap);
streamDataList.add(streamData);
log.debug("streamDataList: {}", streamDataList);
}
wrapped.resultReceived(call);
} else {
log.warn("CreateStreamCallBack resultReceived - call result is null");
}
}
}
......@@ -1101,34 +1092,25 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
public void resultReceived(IPendingServiceCall call) {
// get the result as base object
Object callResult = call.getResult();
if (callResult != null) {
// we expect a number consisting of the stream id, but we'll check for an object map as well
Number streamId = null;
if (callResult instanceof Number) {
streamId = (Number) callResult;
} else if (callResult instanceof Map) {
Map<?, ?> map = (Map<?, ?>) callResult;
if (map.containsKey("streamId")) {
Object tmpStreamId = map.get("streamId");
if (tmpStreamId instanceof Number) {
streamId = (Number) tmpStreamId;
} else {
log.warn("DeleteStreamCallBack resultReceived - stream id is not a number: {}", tmpStreamId);
}
}
} else if (callResult == null) {
log.warn("DeleteStreamCallBack resultReceived - call result is null");
return;
}
final Number streamId = (Number) (callResult instanceof Number ? callResult : (callResult instanceof Map ? ((Map<?, ?>) callResult).get("streamId") : 1.0));
log.debug("DeleteStreamCallBack resultReceived - stream id: {} call: {} connection: {}", streamId, call, conn);
if (conn != null && streamId != null) {
if (conn != null) {
log.debug("Deleting net stream");
conn.removeClientStream(streamId);
// send a delete notify?
//NetStreamPrivateData streamData = streamDataMap.get(streamId);
//streamData.handler.onStreamEvent(notify)
streamDataMap.remove(streamId);
NetStreamPrivateData streamData = streamDataList.stream().filter(s -> s.getStreamId().equals(streamId)).findFirst().orElse(null);
if (streamData != null) {
streamDataList.remove(streamData);
} else {
log.warn("Stream data not found for stream id: {}", streamId);
}
}
wrapped.resultReceived(call);
} else {
log.warn("DeleteStreamCallBack resultReceived - call result is null");
}
}
}
......@@ -1166,12 +1148,24 @@ public abstract class BaseRTMPClientHandler extends BaseRTMPHandler implements I
public volatile ConnectionConsumer connConsumer;
{
final Number streamId;
NetStreamPrivateData(Number streamId) {
this.streamId = streamId;
if (streamEventHandler != null) {
handler = streamEventHandler;
}
}
public Number getStreamId() {
return streamId;
}
@Override
public int hashCode() {
return streamId.hashCode();
}
}
}
......@@ -7,6 +7,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.red5.client.net.rtmp.ClientExceptionHandler;
import org.red5.client.net.rtmp.INetStreamEventHandler;
......@@ -15,13 +17,12 @@ import org.red5.io.ITag;
import org.red5.io.ITagReader;
import org.red5.io.flv.impl.FLVReader;
import org.red5.io.utils.ObjectMap;
import org.red5.server.api.IConnection;
import org.red5.server.api.event.IEvent;
import org.red5.server.api.event.IEventDispatcher;
import org.red5.server.api.service.IPendingServiceCall;
import org.red5.server.api.service.IPendingServiceCallback;
import org.red5.server.api.service.IServiceCall;
import org.red5.server.messaging.IMessage;
import org.red5.server.net.rtmp.RTMPConnection;
import org.red5.server.net.rtmp.event.AudioData;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.Invoke;
......@@ -44,14 +45,21 @@ public class PublisherConnectLoadTest {
private static Logger log = LoggerFactory.getLogger(PublisherConnectLoadTest.class);
private static Random rnd = new Random();
private static ExecutorService executor = Executors.newCachedThreadPool();
public static int publishers = 100;
public static int publishers = 30;
private static CountDownLatch latch = new CountDownLatch(publishers);
private static CopyOnWriteArrayList<RTMPClient> publisherList = new CopyOnWriteArrayList<>();
private static AtomicIntegerFieldUpdater<PublisherConnectLoadTest> AtomicPublishCounter = AtomicIntegerFieldUpdater.newUpdater(PublisherConnectLoadTest.class, "publishCount");
// updated atomically as a counter since the publish list is weakly consistent in terms of size
private volatile int publishCount;
private ITagReader reader;
private ConcurrentLinkedQueue<RTMPMessage> que = new ConcurrentLinkedQueue<>();
......@@ -99,8 +107,8 @@ public class PublisherConnectLoadTest {
log.info("Queue fill completed: {}", que.size());
}
public void testLivePublish() throws InterruptedException {
final String publishName = String.format("stream%d", System.nanoTime());
public void testLivePublish(int i) throws InterruptedException {
final String publishName = String.format("stream%d", i);
log.info("Publisher load test: {}", publishName);
final RTMPClient client = new RTMPClient();
client.setConnectionClosedHandler(() -> {
......@@ -130,8 +138,12 @@ public class PublisherConnectLoadTest {
ObjectMap status = ((ObjectMap) call.getArguments()[0]);
String code = (String) status.get("code");
switch (code) {
case "NetStream.Publish.Success":
case "NetStream.Publish.Start":
log.info("Publish success: {}", publishName);
// do publishing
startPublish(client, publishName);
// randomly decide if a publisher should be killed
maybeKillPublisher();
break;
case "NetStream.UnPublish.Success":
log.info("Unpublish success: {}", publishName);
......@@ -141,10 +153,12 @@ public class PublisherConnectLoadTest {
}
}
}
};
// set the handler
client.setStreamEventHandler(handler);
// connect
executor.submit(() -> {
client.connect(host, port, app, new IPendingServiceCallback() {
@Override
public void resultReceived(IPendingServiceCall call) {
......@@ -158,39 +172,86 @@ public class PublisherConnectLoadTest {
client.createStream(new IPendingServiceCallback() {
@Override
public void resultReceived(IPendingServiceCall call) {
double streamId = (Double) call.getResult();
// live buffer 0.5s
Number streamId = (Number) call.getResult();
log.info("Create for publish: {} with stream id: {}", publishName, streamId);
client.publish(streamId, publishName, "live", handler);
}
});
}
}
});
});
}
public void startPublish(RTMPClient client, String publishName) {
log.info("Start publish: {} name: {}", client, publishName);
// add to list
publisherList.add(client);
if (publisherList.add(client)) {
// increment the counter
AtomicPublishCounter.incrementAndGet(this);
}
// publishing thread
executor.submit(() -> {
// get the underlying connection
IConnection conn = client.getConnection();
final RTMPConnection conn = client.getConnection();
final Number streamId = conn.getStreamId() == null ? 1.0d : conn.getStreamId();
log.info("Publishing: {} stream id: {}", publishName, streamId);
AtomicInteger messageCounter = new AtomicInteger();
// publish stream data
for (IMessage message : que) {
if (message != null) {
log.info("Publishing: {}", message);
client.publishStreamData(streamId, message);
que.spliterator().forEachRemaining(msg -> {
if (msg != null) {
log.trace("Publishing: {}", msg);
client.publishStreamData(streamId, msg);
messageCounter.incrementAndGet();
} else {
log.warn("Null message for: {}", publishName);
}
if (!conn.isConnected()) {
log.warn("Connection closed for: {} while publishing", publishName);
break;
try {
Thread.sleep(13L);
} catch (InterruptedException e) {
}
// TODO(paul) looking to why its always disconnected
/*
// check for disconnect
if (conn.isDisconnected()) {
log.warn("Connection is disconnected for: {} while publishing", publishName);
return;
}
*/
});
// unpublish
client.unpublish(streamId);
disconnect(client);
log.info("publish - end: {}", publishName);
log.info("Publishing completed: {} with {} messages published", publishName, messageCounter.get());
});
}
});
public void maybeKillPublisher() {
// our current publisher count of those with publish-success
log.info("Publisher count: {}", publishCount);
// for every few publishers, kill one off randomly
if (publishCount > (publishers / 3)) {
int index = rnd.nextInt(publishCount);
if (index % 3 == 0) {
log.info("Killing publisher at index: {} of {}", index, publishCount);
RTMPClient client = publisherList.get(index);
if (client != null) {
Number streamId = client.getConnection().getStreamId();
log.info("Unpublishing: {} stream id: {}", client, streamId);
client.unpublish(streamId);
}
}
}
});
}
public static void disconnect(RTMPClient client) {
public void disconnect(RTMPClient client) {
log.info("Disconnecting: {}", client);
// ensure the client is removed from the list
if (publisherList.remove(client)) {
AtomicPublishCounter.decrementAndGet(this);
} else {
log.info("Publisher already removed or was not publishing: {}", client);
}
client.disconnect();
latch.countDown();
}
......@@ -199,10 +260,14 @@ public class PublisherConnectLoadTest {
reader.close();
que.clear();
ExecutorServiceUtil.shutdown(executor);
publisherList.clear();
}
public int getPublishCount() {
return publishCount;
}
public static void main(String[] args) {
Random rnd = new Random();
PublisherConnectLoadTest test = new PublisherConnectLoadTest();
try {
// set up
......@@ -210,19 +275,7 @@ public class PublisherConnectLoadTest {
// launch publishers
for (int i = 0; i < publishers; i++) {
// launch a publisher test
test.testLivePublish();
// our current publisher count of those with publish-success
int publisherCount = publisherList.size();
log.info("Publisher count: {} index: {}", publisherCount, i);
// for every few publishers, kill one off randomly
if (i % 3 == 0 && publisherCount > 10) {
int index = rnd.nextInt(publisherCount);
log.info("Killing publisher at index: {} of {}", index, publisherCount);
RTMPClient client = publisherList.remove(index);
if (client != null) {
disconnect(client);
}
}
test.testLivePublish(i);
}
// wait for all to finish
latch.await();
......@@ -230,6 +283,8 @@ public class PublisherConnectLoadTest {
test.tearDown();
} catch (Exception e) {
log.warn("Exception", e);
} finally {
log.info("Done");
}
}
......
......@@ -143,7 +143,7 @@ public class FBLiveConnectTest {
ObjectMap status = ((ObjectMap) call.getArguments()[0]);
String code = (String) status.get("code");
switch (code) {
case "NetStream.Publish.Success":
case "NetStream.Publish.Start":
publishing = true;
break;
case "NetStream.UnPublish.Success":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment