Skip to content
Snippets Groups Projects
Commit fd4a5ec2 authored by Paul Gregoire's avatar Paul Gregoire
Browse files

Refactor RTMP event logic to ensure ordered processing with minimal locking

parent e5f83877
Branches
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
* @author Jon Valliere
*/
public class RTMPClient extends BaseRTMPClientHandler {
private static final Logger log = LoggerFactory.getLogger(RTMPClient.class);
protected static final int CONNECTOR_WORKER_TIMEOUT = 7000; // milliseconds
......
......@@ -42,6 +42,8 @@ public final class Red5 {
private static Logger log = Red5LoggerFactory.getLogger(Red5.class);
private static boolean isDebug = log.isDebugEnabled();
/**
* Connection associated with the current thread. Each connection runs in a separate thread.
*/
......@@ -118,7 +120,7 @@ public final class Red5 {
* Thread local connection
*/
public static void setConnectionLocal(IConnection connection) {
if (log.isDebugEnabled()) {
if (isDebug) {
log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
try {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
......@@ -148,7 +150,9 @@ public final class Red5 {
WeakReference<IConnection> ref = connThreadLocal.get();
if (ref != null) {
IConnection connection = ref.get();
log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
if (isDebug) {
log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
}
return connection;
} else {
return null;
......
......@@ -10,9 +10,9 @@ package org.red5.server.net.rtmp;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.core.session.IoSession;
......@@ -58,8 +58,8 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
private static boolean isDebug = log.isDebugEnabled();
// single thread pool for handling receive
protected final static ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool();
// thread pool for handling receive
protected final ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool();
/** {@inheritDoc} */
public void connectionOpened(RTMPConnection conn) {
......@@ -106,10 +106,12 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
// NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
// the client sends a few stream packets before stopping; we need to ignore them.
if (stream != null) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null && stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
}
......@@ -133,12 +135,16 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
case TYPE_FLEX_STREAM_SEND:
if (((Notify) message).getData() != null && stream != null) {
// Stream metadata
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null) {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
if (stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
}
epeo.addPacket(message);
} else {
onCommand(conn, channel, header, (Notify) message);
}
......@@ -185,6 +191,18 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
if (conn.getStateCode() != RTMP.STATE_DISCONNECTED) {
// inform any callbacks for pending calls that the connection is closed
conn.sendPendingServiceCallsCloseError();
// clean up / remove the packet execution attribute
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
if (epeo != null) {
epeo.shutdown();
}
if (conn.removeAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
log.debug("Removed packet execution attribute");
}
}
// shutdown the executor
recvDispatchExecutor.shutdownNow();
// close the connection
if (conn.getStateCode() != RTMP.STATE_DISCONNECTING) {
conn.close();
......@@ -367,60 +385,107 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
* Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
* and keeps all incoming events in order.
*/
private static class EnsuresPacketExecutionOrder implements Runnable {
private class EnsuresPacketExecutionOrder implements Runnable {
public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder";
private final IEventDispatcher stream;
private LinkedBlockingQueue<IRTMPEvent> events = new LinkedBlockingQueue<>();
private final RTMPConnection conn;
private AtomicBoolean state = new AtomicBoolean();
private ConcurrentLinkedQueue<IRTMPEvent> events = new ConcurrentLinkedQueue<>();
private final IEventDispatcher stream;
private AtomicBoolean submitted = new AtomicBoolean();
private final RTMPConnection conn;
private volatile String threadName;
private int iter;
private boolean shutdown;
public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) {
log.debug("Created for stream: {} connection: {}", stream, conn);
this.stream = stream;
this.conn = conn;
}
/**
* Shutdown and clean up.
*/
public void shutdown() {
log.debug("Shutdown; events: {}", events.size());
// set shutdown flag preventing further adds
shutdown = true;
// release all events
events.forEach(event -> {
event.release();
});
// clear the queue
events.clear();
}
/**
* Add packet to the stream's incoming queue.
*
* @param packet
*/
public void addPacket(IRTMPEvent packet) {
events.offer(packet);
if (state.compareAndSet(false, true)) {
recvDispatchExecutor.submit(this);
if (!shutdown) {
log.debug("addPacket: {}", packet);
// add to queue
events.offer(packet);
// if we are not already running, submit for execution
if (submitted.compareAndSet(false, true)) {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
log.debug("Submit: {}", threadName);
recvDispatchExecutor.submit(this);
}
} else {
log.debug("Shutdown, not adding packet");
}
}
@Override
public void run() {
// use int to identify different thread instance
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++));
iter &= 7;
Thread.currentThread().setName(threadName);
// always set connection local on dispatch threads
Red5.setConnectionLocal(conn);
// we were created for a reason, grab the event
IRTMPEvent message = events.poll();
// null check just in case queue was drained before we woke
if (message != null) {
// dispatch to stream
stream.dispatchEvent(message);
// release / clean up
message.release();
try {
// we were created for a reason, grab the event; add short timeout just in case
IRTMPEvent packet = events.peek();
// null check just in case queue was drained before we woke
if (packet != null && events.remove(packet)) {
if (isDebug) {
log.debug("Taken packet: {}", packet);
}
// dispatch to stream
stream.dispatchEvent(packet);
// release / clean up
packet.release();
}
} catch (Exception e) {
log.warn("Exception polling for next message", e);
}
// set null before resubmit
Red5.setConnectionLocal(null);
// resubmit for another go if we have more
if (!events.isEmpty()) {
recvDispatchExecutor.submit(this);
// check for shutdown and then submit or resubmit
if (!shutdown) {
if (events.isEmpty()) {
log.debug("Queue is empty");
if (submitted.compareAndSet(true, false)) {
// false state will allow resubmit at the next add
log.debug("Allow new submit");
}
} else {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
if (isDebug) {
log.debug("Resubmit: {}", threadName);
}
// resubmitting rather than looping until empty plays nice with other threads
recvDispatchExecutor.submit(this);
}
} else {
state.set(false);
log.debug("Shutdown, no more submits");
}
// resubmitting rather than looping until empty plays nice with other threads
}
}
......
......@@ -102,6 +102,8 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
public static final Object RTMP_HANDLER = "rtmp.handler";
public final static String RTMP_EXECUTION_ORDERER = "rtmp.execution.orderer";
/**
* Marker byte for standard or non-encrypted RTMP data.
*/
......
......@@ -109,13 +109,13 @@ public class ConversionUtils {
return source;
}
final Class<?> sourceClass = source.getClass();
log.info("Source: {} target: {}", sourceClass, target);
log.debug("Source: {} target: {}", sourceClass, target);
if (target.isInstance(source) || target.isAssignableFrom(sourceClass)) {
log.info("Source: {} is already an instance of: {}", source, target);
log.debug("Source: {} is already an instance of: {}", source, target);
return source;
}
if (target.isArray()) {
log.info("Source: {} to target array: {}", source, target);
log.debug("Source: {} to target array: {}", source, target);
return convertToArray(source, target);
}
if (target.equals(String.class)) {
......@@ -134,10 +134,10 @@ public class ConversionUtils {
return convertMapToList((LinkedHashMap<?, ?>) source);
} else if (sourceClass.isArray()) {
if (List.class.isAssignableFrom(target)) {
log.info("Source: {} to target list: {}", source, target);
log.debug("Source: {} to target list: {}", source, target);
return Arrays.stream((Object[]) source).collect(Collectors.toCollection(ArrayList::new));
} else if (Set.class.isAssignableFrom(target)) {
log.info("Source: {} to target set: {}", source, target);
log.debug("Source: {} to target set: {}", source, target);
// special handling for sets when the source is a list
if (source instanceof List) {
return ((List<?>) source).stream().collect(Collectors.toCollection(HashSet::new));
......
......@@ -27,7 +27,6 @@ import java.util.stream.Stream;
import javax.websocket.Extension;
import javax.websocket.Session;
import javax.websocket.RemoteEndpoint.Basic;
import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.websocket.Constants;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment