diff --git a/client/src/main/java/org/red5/client/net/rtmp/RTMPClient.java b/client/src/main/java/org/red5/client/net/rtmp/RTMPClient.java index a354676ef74d6641fe505c48eb8a4462875ae853..21801d00df13504301f90967bc8136bd26f98af9 100644 --- a/client/src/main/java/org/red5/client/net/rtmp/RTMPClient.java +++ b/client/src/main/java/org/red5/client/net/rtmp/RTMPClient.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; * @author Jon Valliere */ public class RTMPClient extends BaseRTMPClientHandler { + private static final Logger log = LoggerFactory.getLogger(RTMPClient.class); protected static final int CONNECTOR_WORKER_TIMEOUT = 7000; // milliseconds diff --git a/common/src/main/java/org/red5/server/api/Red5.java b/common/src/main/java/org/red5/server/api/Red5.java index d42c9c0979892c5d5373827689ac6da7887e8b14..5d3886635cf802944d7190d8b9ac1d26d08f057f 100644 --- a/common/src/main/java/org/red5/server/api/Red5.java +++ b/common/src/main/java/org/red5/server/api/Red5.java @@ -42,6 +42,8 @@ public final class Red5 { private static Logger log = Red5LoggerFactory.getLogger(Red5.class); + private static boolean isDebug = log.isDebugEnabled(); + /** * Connection associated with the current thread. Each connection runs in a separate thread. */ @@ -118,7 +120,7 @@ public final class Red5 { * Thread local connection */ public static void setConnectionLocal(IConnection connection) { - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); try { StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); @@ -148,7 +150,9 @@ public final class Red5 { WeakReference<IConnection> ref = connThreadLocal.get(); if (ref != null) { IConnection connection = ref.get(); - log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); + if (isDebug) { + log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); + } return connection; } else { return null; diff --git a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index dfeea93444ec9d3893b45d01e8fd8f0a33376cbb..58474da9c64b5ff2e1f8a023ee31d27cbb429a34 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -10,9 +10,9 @@ package org.red5.server.net.rtmp; import java.lang.ref.WeakReference; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.core.session.IoSession; @@ -58,8 +58,8 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status private static boolean isDebug = log.isDebugEnabled(); - // single thread pool for handling receive - protected final static ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool(); + // thread pool for handling receive + protected final ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool(); /** {@inheritDoc} */ public void connectionOpened(RTMPConnection conn) { @@ -106,10 +106,12 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status // NOTE: If we respond to "publish" with "NetStream.Publish.BadName", // the client sends a few stream packets before stopping; we need to ignore them. if (stream != null) { - EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); - if (epeo == null && stream != null) { + EnsuresPacketExecutionOrder epeo = null; + if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { + epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); + } else { epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); - conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo); } epeo.addPacket(message); } @@ -133,12 +135,16 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status case TYPE_FLEX_STREAM_SEND: if (((Notify) message).getData() != null && stream != null) { // Stream metadata - EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); - if (epeo == null) { - epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); - conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + if (stream != null) { + EnsuresPacketExecutionOrder epeo = null; + if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { + epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); + } else { + epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); + conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo); + } + epeo.addPacket(message); } - epeo.addPacket(message); } else { onCommand(conn, channel, header, (Notify) message); } @@ -185,6 +191,18 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status if (conn.getStateCode() != RTMP.STATE_DISCONNECTED) { // inform any callbacks for pending calls that the connection is closed conn.sendPendingServiceCallsCloseError(); + // clean up / remove the packet execution attribute + if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { + EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); + if (epeo != null) { + epeo.shutdown(); + } + if (conn.removeAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { + log.debug("Removed packet execution attribute"); + } + } + // shutdown the executor + recvDispatchExecutor.shutdownNow(); // close the connection if (conn.getStateCode() != RTMP.STATE_DISCONNECTING) { conn.close(); @@ -367,60 +385,107 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream * and keeps all incoming events in order. */ - private static class EnsuresPacketExecutionOrder implements Runnable { + private class EnsuresPacketExecutionOrder implements Runnable { - public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder"; + private final IEventDispatcher stream; - private LinkedBlockingQueue<IRTMPEvent> events = new LinkedBlockingQueue<>(); + private final RTMPConnection conn; - private AtomicBoolean state = new AtomicBoolean(); + private ConcurrentLinkedQueue<IRTMPEvent> events = new ConcurrentLinkedQueue<>(); - private final IEventDispatcher stream; + private AtomicBoolean submitted = new AtomicBoolean(); - private final RTMPConnection conn; + private volatile String threadName; - private int iter; + private boolean shutdown; public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) { + log.debug("Created for stream: {} connection: {}", stream, conn); this.stream = stream; this.conn = conn; } + /** + * Shutdown and clean up. + */ + public void shutdown() { + log.debug("Shutdown; events: {}", events.size()); + // set shutdown flag preventing further adds + shutdown = true; + // release all events + events.forEach(event -> { + event.release(); + }); + // clear the queue + events.clear(); + } + /** * Add packet to the stream's incoming queue. + * * @param packet */ public void addPacket(IRTMPEvent packet) { - events.offer(packet); - if (state.compareAndSet(false, true)) { - recvDispatchExecutor.submit(this); + if (!shutdown) { + log.debug("addPacket: {}", packet); + // add to queue + events.offer(packet); + // if we are not already running, submit for execution + if (submitted.compareAndSet(false, true)) { + // use last 3 digits of nano time to identify different thread instance + threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L)); + log.debug("Submit: {}", threadName); + recvDispatchExecutor.submit(this); + } + } else { + log.debug("Shutdown, not adding packet"); } } + @Override public void run() { // use int to identify different thread instance - Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++)); - iter &= 7; + Thread.currentThread().setName(threadName); // always set connection local on dispatch threads Red5.setConnectionLocal(conn); - // we were created for a reason, grab the event - IRTMPEvent message = events.poll(); - // null check just in case queue was drained before we woke - if (message != null) { - // dispatch to stream - stream.dispatchEvent(message); - // release / clean up - message.release(); + try { + // we were created for a reason, grab the event; add short timeout just in case + IRTMPEvent packet = events.peek(); + // null check just in case queue was drained before we woke + if (packet != null && events.remove(packet)) { + if (isDebug) { + log.debug("Taken packet: {}", packet); + } + // dispatch to stream + stream.dispatchEvent(packet); + // release / clean up + packet.release(); + } + } catch (Exception e) { + log.warn("Exception polling for next message", e); } // set null before resubmit Red5.setConnectionLocal(null); - // resubmit for another go if we have more - if (!events.isEmpty()) { - recvDispatchExecutor.submit(this); + // check for shutdown and then submit or resubmit + if (!shutdown) { + if (events.isEmpty()) { + log.debug("Queue is empty"); + if (submitted.compareAndSet(true, false)) { + // false state will allow resubmit at the next add + log.debug("Allow new submit"); + } + } else { + // use last 3 digits of nano time to identify different thread instance + threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L)); + if (isDebug) { + log.debug("Resubmit: {}", threadName); + } + // resubmitting rather than looping until empty plays nice with other threads + recvDispatchExecutor.submit(this); + } } else { - state.set(false); + log.debug("Shutdown, no more submits"); } - // resubmitting rather than looping until empty plays nice with other threads } } diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index 31005c3668338af482bf5d3f6b9d33dc01eac569..ae1ec49595e141c33a15b9e388b7fd6f88d82198 100755 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -102,6 +102,8 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa public static final Object RTMP_HANDLER = "rtmp.handler"; + public final static String RTMP_EXECUTION_ORDERER = "rtmp.execution.orderer"; + /** * Marker byte for standard or non-encrypted RTMP data. */ diff --git a/io/src/main/java/org/red5/io/utils/ConversionUtils.java b/io/src/main/java/org/red5/io/utils/ConversionUtils.java index babaa2e2785cd9cf6f6d8b793d6b346042a4bdeb..6a9db33f21a4411d65bc60dccaf33ce1683012e2 100644 --- a/io/src/main/java/org/red5/io/utils/ConversionUtils.java +++ b/io/src/main/java/org/red5/io/utils/ConversionUtils.java @@ -109,13 +109,13 @@ public class ConversionUtils { return source; } final Class<?> sourceClass = source.getClass(); - log.info("Source: {} target: {}", sourceClass, target); + log.debug("Source: {} target: {}", sourceClass, target); if (target.isInstance(source) || target.isAssignableFrom(sourceClass)) { - log.info("Source: {} is already an instance of: {}", source, target); + log.debug("Source: {} is already an instance of: {}", source, target); return source; } if (target.isArray()) { - log.info("Source: {} to target array: {}", source, target); + log.debug("Source: {} to target array: {}", source, target); return convertToArray(source, target); } if (target.equals(String.class)) { @@ -134,10 +134,10 @@ public class ConversionUtils { return convertMapToList((LinkedHashMap<?, ?>) source); } else if (sourceClass.isArray()) { if (List.class.isAssignableFrom(target)) { - log.info("Source: {} to target list: {}", source, target); + log.debug("Source: {} to target list: {}", source, target); return Arrays.stream((Object[]) source).collect(Collectors.toCollection(ArrayList::new)); } else if (Set.class.isAssignableFrom(target)) { - log.info("Source: {} to target set: {}", source, target); + log.debug("Source: {} to target set: {}", source, target); // special handling for sets when the source is a list if (source instanceof List) { return ((List<?>) source).stream().collect(Collectors.toCollection(HashSet::new)); diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java index a609bdb81f5ff862e14140dc7409fd4984e55402..f6461505789249c5456cb1dce05fa96e5c653e41 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java @@ -27,7 +27,6 @@ import java.util.stream.Stream; import javax.websocket.Extension; import javax.websocket.Session; -import javax.websocket.RemoteEndpoint.Basic; import org.apache.commons.lang3.StringUtils; import org.apache.tomcat.websocket.Constants;