diff --git a/client/pom.xml b/client/pom.xml index d5ade07d0582d91ecd28014f78aefab2cacbaaee..1d2614a710d7c90c109fbc30d7b23ff830943cd9 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-client</artifactId> @@ -16,6 +16,9 @@ <build> <defaultGoal>install</defaultGoal> <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> <plugin> <groupId>net.revelc.code.formatter</groupId> <artifactId>formatter-maven-plugin</artifactId> @@ -43,15 +46,6 @@ </archive> </configuration> </plugin> - <plugin> - <artifactId>maven-release-plugin</artifactId> - <configuration> - <autoVersionSubmodules>true</autoVersionSubmodules> - <useReleaseProfile>false</useReleaseProfile> - <releaseProfiles>release</releaseProfiles> - <goals>deploy</goals> - </configuration> - </plugin> </plugins> </build> <dependencies> diff --git a/client/src/main/java/org/red5/client/Red5Client.java b/client/src/main/java/org/red5/client/Red5Client.java index 46102692de1ad611ea3bb283429da8e63f2a31c0..5d168b8259a47b3cfacfbf637a7ba356946db8ab 100644 --- a/client/src/main/java/org/red5/client/Red5Client.java +++ b/client/src/main/java/org/red5/client/Red5Client.java @@ -18,7 +18,7 @@ public final class Red5Client { /** * Current server version with revision */ - public static final String VERSION = "Red5 Client 1.3.28"; + public static final String VERSION = "Red5 Client 1.3.29"; /** * Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope diff --git a/common/pom.xml b/common/pom.xml index 0db367b127d83e44ce9b2a6f7bcb55109b40dfbb..713c995de6dc23a50e5fc982d8748afc0450d3e1 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-server-common</artifactId> @@ -113,7 +113,7 @@ <dependency> <groupId>net.engio</groupId> <artifactId>mbassador</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </dependency> --> <dependency> <groupId>junit</groupId> diff --git a/common/src/main/java/org/red5/server/api/Red5.java b/common/src/main/java/org/red5/server/api/Red5.java index 5d3886635cf802944d7190d8b9ac1d26d08f057f..a2dc5dc3d5caf138cec1ae9f239eae6d3825eb99 100644 --- a/common/src/main/java/org/red5/server/api/Red5.java +++ b/common/src/main/java/org/red5/server/api/Red5.java @@ -57,12 +57,12 @@ public final class Red5 { /** * Server version with revision */ - public static final String VERSION = "Red5 Server 1.3.28"; + public static final String VERSION = "Red5 Server 1.3.29"; /** * Server version for fmsVer requests */ - public static final String FMS_VERSION = "RED5/1,3,28,0"; + public static final String FMS_VERSION = "RED5/1,3,29,0"; /** * Server capabilities diff --git a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index 58474da9c64b5ff2e1f8a023ee31d27cbb429a34..6e44ca95fbbe0a4989d6e262ae3be87ff71f5676 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -10,14 +10,9 @@ package org.red5.server.net.rtmp; import java.lang.ref.WeakReference; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.core.session.IoSession; import org.red5.io.object.StreamAction; -import org.red5.server.api.Red5; import org.red5.server.api.event.IEventDispatcher; import org.red5.server.api.service.IPendingServiceCall; import org.red5.server.api.service.IPendingServiceCallback; @@ -58,9 +53,6 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status private static boolean isDebug = log.isDebugEnabled(); - // thread pool for handling receive - protected final ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool(); - /** {@inheritDoc} */ public void connectionOpened(RTMPConnection conn) { if (isTrace) { @@ -106,20 +98,12 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status // NOTE: If we respond to "publish" with "NetStream.Publish.BadName", // the client sends a few stream packets before stopping; we need to ignore them. if (stream != null) { - EnsuresPacketExecutionOrder epeo = null; - if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { - epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); - } else { - epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); - conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo); - } - epeo.addPacket(message); + // dispatch to stream + ((IEventDispatcher) stream).dispatchEvent(message); + // release / clean up + message.release(); } break; - case TYPE_FLEX_SHARED_OBJECT: - case TYPE_SHARED_OBJECT: - onSharedObject(conn, channel, header, (SharedObjectMessage) message); - break; case TYPE_INVOKE: case TYPE_FLEX_MESSAGE: onCommand(conn, channel, header, (Invoke) message); @@ -136,14 +120,10 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status if (((Notify) message).getData() != null && stream != null) { // Stream metadata if (stream != null) { - EnsuresPacketExecutionOrder epeo = null; - if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { - epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); - } else { - epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); - conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo); - } - epeo.addPacket(message); + // dispatch to stream + ((IEventDispatcher) stream).dispatchEvent(message); + // release / clean up + message.release(); } } else { onCommand(conn, channel, header, (Notify) message); @@ -152,6 +132,10 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status case TYPE_PING: onPing(conn, channel, header, (Ping) message); break; + case TYPE_FLEX_SHARED_OBJECT: + case TYPE_SHARED_OBJECT: + onSharedObject(conn, channel, header, (SharedObjectMessage) message); + break; case TYPE_BYTES_READ: onStreamBytesRead(conn, channel, header, (BytesRead) message); break; @@ -191,18 +175,6 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status if (conn.getStateCode() != RTMP.STATE_DISCONNECTED) { // inform any callbacks for pending calls that the connection is closed conn.sendPendingServiceCallsCloseError(); - // clean up / remove the packet execution attribute - if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { - EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER); - if (epeo != null) { - epeo.shutdown(); - } - if (conn.removeAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) { - log.debug("Removed packet execution attribute"); - } - } - // shutdown the executor - recvDispatchExecutor.shutdownNow(); // close the connection if (conn.getStateCode() != RTMP.STATE_DISCONNECTING) { conn.close(); @@ -381,112 +353,4 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status */ protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message); - /** - * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream - * and keeps all incoming events in order. - */ - private class EnsuresPacketExecutionOrder implements Runnable { - - private final IEventDispatcher stream; - - private final RTMPConnection conn; - - private ConcurrentLinkedQueue<IRTMPEvent> events = new ConcurrentLinkedQueue<>(); - - private AtomicBoolean submitted = new AtomicBoolean(); - - private volatile String threadName; - - private boolean shutdown; - - public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) { - log.debug("Created for stream: {} connection: {}", stream, conn); - this.stream = stream; - this.conn = conn; - } - - /** - * Shutdown and clean up. - */ - public void shutdown() { - log.debug("Shutdown; events: {}", events.size()); - // set shutdown flag preventing further adds - shutdown = true; - // release all events - events.forEach(event -> { - event.release(); - }); - // clear the queue - events.clear(); - } - - /** - * Add packet to the stream's incoming queue. - * - * @param packet - */ - public void addPacket(IRTMPEvent packet) { - if (!shutdown) { - log.debug("addPacket: {}", packet); - // add to queue - events.offer(packet); - // if we are not already running, submit for execution - if (submitted.compareAndSet(false, true)) { - // use last 3 digits of nano time to identify different thread instance - threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L)); - log.debug("Submit: {}", threadName); - recvDispatchExecutor.submit(this); - } - } else { - log.debug("Shutdown, not adding packet"); - } - } - - @Override - public void run() { - // use int to identify different thread instance - Thread.currentThread().setName(threadName); - // always set connection local on dispatch threads - Red5.setConnectionLocal(conn); - try { - // we were created for a reason, grab the event; add short timeout just in case - IRTMPEvent packet = events.peek(); - // null check just in case queue was drained before we woke - if (packet != null && events.remove(packet)) { - if (isDebug) { - log.debug("Taken packet: {}", packet); - } - // dispatch to stream - stream.dispatchEvent(packet); - // release / clean up - packet.release(); - } - } catch (Exception e) { - log.warn("Exception polling for next message", e); - } - // set null before resubmit - Red5.setConnectionLocal(null); - // check for shutdown and then submit or resubmit - if (!shutdown) { - if (events.isEmpty()) { - log.debug("Queue is empty"); - if (submitted.compareAndSet(true, false)) { - // false state will allow resubmit at the next add - log.debug("Allow new submit"); - } - } else { - // use last 3 digits of nano time to identify different thread instance - threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L)); - if (isDebug) { - log.debug("Resubmit: {}", threadName); - } - // resubmitting rather than looping until empty plays nice with other threads - recvDispatchExecutor.submit(this); - } - } else { - log.debug("Shutdown, no more submits"); - } - } - } - } diff --git a/common/src/main/java/org/red5/server/net/rtmp/IReceivedMessageTaskQueueListener.java b/common/src/main/java/org/red5/server/net/rtmp/IReceivedMessageTaskQueueListener.java deleted file mode 100644 index 340149d035130992431c5b3f3ad8dbd9e8e76555..0000000000000000000000000000000000000000 --- a/common/src/main/java/org/red5/server/net/rtmp/IReceivedMessageTaskQueueListener.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * RED5 Open Source Media Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ - -package org.red5.server.net.rtmp; - -public interface IReceivedMessageTaskQueueListener { - void onTaskAdded(ReceivedMessageTaskQueue queue); - - void onTaskRemoved(ReceivedMessageTaskQueue queue); -} diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index 6a5c171529d861ec1b537bd573208bdc412c461a..c146430075fe1fbd1e1eb1c0a9bf64b19fe5b74d 100755 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import org.apache.mina.core.buffer.IoBuffer; @@ -79,7 +80,6 @@ import org.red5.server.stream.SingleItemSubscriberStream; import org.red5.server.stream.StreamService; import org.red5.server.util.ScopeUtils; import org.springframework.core.task.TaskRejectedException; -import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.concurrent.ListenableFuture; @@ -90,7 +90,7 @@ import org.springframework.util.concurrent.ListenableFutureTask; * RTMP connection. Stores information about client streams, data transfer channels, pending RPC calls, bandwidth configuration, AMF * encoding type (AMF0/AMF3), connection state (is alive, last ping time and ping result) and session. */ -public abstract class RTMPConnection extends BaseConnection implements IStreamCapableConnection, IServiceCapableConnection, IReceivedMessageTaskQueueListener { +public abstract class RTMPConnection extends BaseConnection implements IStreamCapableConnection, IServiceCapableConnection { public static final String RTMP_SESSION_ID = "rtmp.sessionid"; @@ -102,8 +102,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa public static final Object RTMP_HANDLER = "rtmp.handler"; - public final static String RTMP_EXECUTION_ORDERER = "rtmp.execution.orderer"; - /** * Marker byte for standard or non-encrypted RTMP data. */ @@ -142,6 +140,11 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa // ~320 streams seems like a sufficient max amount of streams for a single connection public static final double MAX_RESERVED_STREAMS = 320; + /** + * Updater for taskCount field. + */ + private static final AtomicIntegerFieldUpdater<RTMPConnection> receivedQueueSizeUpdater = AtomicIntegerFieldUpdater.newUpdater(RTMPConnection.class, "receivedQueueSize"); + /** * Initial channel capacity */ @@ -189,13 +192,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa */ protected transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel); - /** - * Queues of tasks for every channel - * - * @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue - */ - protected final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel); - /** * Client streams * @@ -208,6 +204,11 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa */ protected transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel)); + /** + * Received packet queue size + */ + protected volatile int receivedQueueSize; + /** * Transaction identifier for remote commands. */ @@ -329,11 +330,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa */ protected transient ThreadPoolTaskExecutor executor; - /** - * Thread pool for guarding deadlocks. - */ - protected transient ThreadPoolTaskScheduler deadlockGuardScheduler; - /** * Keep-alive worker flag */ @@ -354,16 +350,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa */ protected final AtomicLong packetSequence = new AtomicLong(); - /** - * Specify the size of queue that will trigger audio packet dropping, disabled if it's 0 - * */ - private Integer executorQueueSizeToDropAudioPackets = 0; - - /** - * Keep track of current queue size - * */ - private final AtomicInteger currentQueueSize = new AtomicInteger(); - /** * Wait for handshake task. */ @@ -385,9 +371,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa protected transient Future<?> receivedPacketFuture; /** - * Queue for received RTMP packets. + * Queue for received RTMP packets. This is a transfer queue from which packets are passed to a handler. */ - protected LinkedTransferQueue<Packet> receivedPacketQueue = new LinkedTransferQueue<>(); + protected volatile LinkedTransferQueue<Packet> receivedPacketQueue = new LinkedTransferQueue<>(); /** * Creates anonymous RTMP connection without scope. @@ -691,11 +677,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa log.trace("Channels: {}", channels); } } - /* - * ReceivedMessageTaskQueue queue = tasksByChannels.remove(channelId); if (queue != null) { if (isConnected()) { // if connected, drain and process the tasks queued-up - * log.debug("Processing remaining tasks at close for channel: {}", channelId); processTasksQueue(queue); } queue.removeAllTasks(); } else if (isTrace) { - * log.trace("No task queue for id: {}", channelId); } - */ chan = null; } @@ -1449,166 +1430,76 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa if (maxHandlingTimeout > 0) { packet.setExpirationTime(System.currentTimeMillis() + maxHandlingTimeout); } - if (executor != null) { - final byte dataType = packet.getHeader().getDataType(); - // route these types outside the executor - switch (dataType) { - case Constants.TYPE_PING: - case Constants.TYPE_ABORT: - case Constants.TYPE_BYTES_READ: - case Constants.TYPE_CHUNK_SIZE: - case Constants.TYPE_CLIENT_BANDWIDTH: - case Constants.TYPE_SERVER_BANDWIDTH: - // pass message to the handler - try { - handler.messageReceived(this, packet); - } catch (Exception e) { - log.error("Error processing received message {}", sessionId, e); - } - break; - default: - final String messageType = getMessageType(packet); - try { - // increment the packet number - final long packetNumber = packetSequence.incrementAndGet(); - if (executorQueueSizeToDropAudioPackets > 0 && currentQueueSize.get() >= executorQueueSizeToDropAudioPackets) { - if (packet.getHeader().getDataType() == Constants.TYPE_AUDIO_DATA) { - // if there's a backlog of messages in the queue. Flash might have sent a burst of messages after a network congestion. Throw away packets that we are able to discard. - log.info("Queue threshold reached. Discarding packet: session=[{}], msgType=[{}], packetNum=[{}]", sessionId, messageType, packetNumber); - return; - } - } - int streamId = packet.getHeader().getStreamId().intValue(); - if (isTrace) { - log.trace("Handling message for streamId: {}, channelId: {} Channels: {}", streamId, packet.getHeader().getChannelId(), channels); - } - // create a task to setProcessing the message - ReceivedMessageTask task = new ReceivedMessageTask(sessionId, packet, handler, this); - task.setPacketNumber(packetNumber); - // create a task queue - ReceivedMessageTaskQueue newStreamTasks = new ReceivedMessageTaskQueue(streamId, this); - // put the queue in the task by stream map - ReceivedMessageTaskQueue currentStreamTasks = tasksByStreams.putIfAbsent(streamId, newStreamTasks); - if (currentStreamTasks != null) { - // add the task to the existing queue - currentStreamTasks.addTask(task); - } else { - // add the task to the newly created and just added queue - newStreamTasks.addTask(task); - } - } catch (Exception e) { - log.error("Incoming message handling failed on session=[" + sessionId + "], messageType=[" + messageType + "]", e); - if (isDebug) { - log.debug("Execution rejected on {} - {}", sessionId, RTMP.states[getStateCode()]); - log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits()); - } - } - } - } else { - //if (isTrace) { - // log.trace("Executor is null on {} state: {}", sessionId, RTMP.states[getStateCode()]); - //} - // queue the packet - receivedPacketQueue.offer(packet); - // create the future for processing the queue as needed - if (receivedPacketFuture == null) { - final RTMPConnection conn = this; - receivedPacketFuture = receivedPacketExecutor.submit(() -> { - Thread.currentThread().setName(String.format("RTMPRecv@%s", sessionId)); + // queue the packet + if (receivedPacketQueue.offer(packet)) { + // increment the queue size + receivedQueueSizeUpdater.incrementAndGet(this); + } + // create the future for processing the queue as needed + if (receivedPacketFuture == null) { + final RTMPConnection conn = this; + receivedPacketFuture = receivedPacketExecutor.submit(() -> { + Thread.currentThread().setName(String.format("RTMPRecv@%s", sessionId)); + try { do { - try { - // DTS appears to be off only by < 10ms - Packet p = receivedPacketQueue.poll(maxPollTimeout, TimeUnit.MILLISECONDS); // wait for a packet up to 10 seconds - if (p != null) { - if (isTrace) { - log.trace("Handle received packet: {}", p); + // DTS appears to be off only by < 10ms + Packet p = receivedPacketQueue.poll(maxPollTimeout, TimeUnit.MILLISECONDS); // wait for a packet up to 10 seconds + if (p != null) { + if (isTrace) { + log.trace("Handle received packet: {}", p); + } + // decrement the queue size + receivedQueueSizeUpdater.decrementAndGet(this); + // call directly to the handler + //handler.messageReceived(conn, p); + // create a task to handle the packet + ReceivedMessageTask task = new ReceivedMessageTask(conn, p); + try { + @SuppressWarnings("unchecked") + ListenableFuture<Packet> future = (ListenableFuture<Packet>) executor.submitListenable(new ListenableFutureTask<Packet>(task)); + future.addCallback(new ListenableFutureCallback<Packet>() { + + long startTime = System.currentTimeMillis(); + + int getProcessingTime() { + return (int) (System.currentTimeMillis() - startTime); + } + + @SuppressWarnings("null") + public void onFailure(Throwable t) { + log.warn("onFailure - processingTime: {} msgtype: {} task: {}", getProcessingTime(), getMessageType(packet), task); + } + + @SuppressWarnings("null") + public void onSuccess(Packet packet) { + log.debug("onSuccess - processingTime: {} msgtype: {} task: {}", getProcessingTime(), getMessageType(packet), task); + } + + }); + } catch (TaskRejectedException tre) { + Throwable[] suppressed = tre.getSuppressed(); + for (Throwable t : suppressed) { + log.warn("Suppressed exception on {}", task, t); + } + log.info("Rejected task: {}", task); + } catch (Throwable e) { + log.error("Incoming message failed task: {}", task, e); + if (isDebug) { + log.debug("Execution rejected on {} - {}", getSessionId(), RTMP.states[getStateCode()]); + log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits()); } - // pass message to the handler where any sorting or delays would need to be injected - handler.messageReceived(conn, p); } - } catch (InterruptedException e) { - log.debug("Interrupted while waiting for message {} state: {}", sessionId, RTMP.states[getStateCode()], e); - } catch (Exception e) { - log.error("Error processing received message {} state: {}", sessionId, RTMP.states[getStateCode()], e); } } while (state.getState() < RTMP.STATE_ERROR); // keep processing unless we pass the error state + } catch (InterruptedException e) { + log.debug("Interrupted while waiting for message {} state: {}", sessionId, RTMP.states[getStateCode()], e); + } catch (Exception e) { + log.error("Error processing received message {} state: {}", sessionId, RTMP.states[getStateCode()], e); + } finally { receivedPacketFuture = null; receivedPacketQueue.clear(); - }); - } - } - } - - @Override - public void onTaskAdded(ReceivedMessageTaskQueue queue) { - currentQueueSize.incrementAndGet(); - processTasksQueue(queue); - } - - @Override - public void onTaskRemoved(ReceivedMessageTaskQueue queue) { - currentQueueSize.decrementAndGet(); - processTasksQueue(queue); - } - - @SuppressWarnings("unchecked") - private void processTasksQueue(final ReceivedMessageTaskQueue currentStreamTasks) { - int streamId = currentStreamTasks.getStreamId(); - if (isTrace) { - log.trace("Process tasks for streamId {}", streamId); - } - final ReceivedMessageTask task = currentStreamTasks.getTaskToProcess(); - if (task != null) { - Packet packet = task.getPacket(); - try { - final String messageType = getMessageType(packet); - ListenableFuture<Packet> future = (ListenableFuture<Packet>) executor.submitListenable(new ListenableFutureTask<Packet>(task)); - future.addCallback(new ListenableFutureCallback<Packet>() { - - final long startTime = System.currentTimeMillis(); - - int getProcessingTime() { - return (int) (System.currentTimeMillis() - startTime); - } - - @SuppressWarnings("null") - public void onFailure(Throwable t) { - log.debug("ReceivedMessageTask failure: {}", t); - if (log.isWarnEnabled()) { - log.warn("onFailure - session: {}, msgtype: {}, processingTime: {}, packetNum: {}", sessionId, messageType, getProcessingTime(), task.getPacketNumber()); - } - currentStreamTasks.removeTask(task); - } - - public void onSuccess(@Nullable - Packet packet) { - log.debug("ReceivedMessageTask success"); - if (isDebug) { - log.debug("onSuccess - session: {}, msgType: {}, processingTime: {}, packetNum: {}", sessionId, messageType, getProcessingTime(), task.getPacketNumber()); - } - currentStreamTasks.removeTask(task); - } - - }); - } catch (TaskRejectedException tre) { - Throwable[] suppressed = tre.getSuppressed(); - for (Throwable t : suppressed) { - log.warn("Suppressed exception on {}", sessionId, t); } - log.info("Rejected message: {} on {}", packet, sessionId); - currentStreamTasks.removeTask(task); - } catch (Throwable e) { - log.error("Incoming message handling failed on session=[" + sessionId + "]", e); - if (isDebug) { - log.debug("Execution rejected on {} - {}", getSessionId(), RTMP.states[getStateCode()]); - log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits()); - } - currentStreamTasks.removeTask(task); - } - } else { - if (isTrace) { - log.trace("Channel {} task queue is empty", streamId); - } + }); } } @@ -1654,7 +1545,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa * @return current message queue size */ protected int currentQueueSize() { - return currentQueueSize.get(); + return receivedQueueSize; } /** {@inheritDoc} */ @@ -1809,8 +1700,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa * * @return the deadlockGuardScheduler */ + @Deprecated(since = "1.3.29", forRemoval = true) public ThreadPoolTaskScheduler getDeadlockGuardScheduler() { - return deadlockGuardScheduler; + return null; } /** @@ -1819,8 +1711,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa * @param deadlockGuardScheduler * the deadlockGuardScheduler to set */ + @Deprecated(since = "1.3.29", forRemoval = true) public void setDeadlockGuardScheduler(ThreadPoolTaskScheduler deadlockGuardScheduler) { - this.deadlockGuardScheduler = deadlockGuardScheduler; + // unused } /** @@ -1943,8 +1836,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa * @param executorQueueSizeToDropAudioPackets * queue size */ + @Deprecated(since = "1.3.29", forRemoval = true) public void setExecutorQueueSizeToDropAudioPackets(Integer executorQueueSizeToDropAudioPackets) { - this.executorQueueSizeToDropAudioPackets = executorQueueSizeToDropAudioPackets; + // unused } @Override @@ -2058,4 +1952,4 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa } -} +} \ No newline at end of file diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPHandler.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPHandler.java index cc3540d32da128aea45e83bb93cfc54649525050..551bb084cb74c7c4a3ab610b7e90706c802ea656 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPHandler.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPHandler.java @@ -8,7 +8,6 @@ package org.red5.server.net.rtmp; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.red5.io.object.StreamAction; @@ -54,12 +53,11 @@ import org.red5.server.so.SharedObjectService; import org.red5.server.stream.StreamService; import org.red5.server.util.ScopeUtils; import org.slf4j.Logger; -import org.springframework.beans.factory.DisposableBean; /** * RTMP events handler. */ -public class RTMPHandler extends BaseRTMPHandler implements DisposableBean { +public class RTMPHandler extends BaseRTMPHandler { protected static Logger log = Red5LoggerFactory.getLogger(RTMPHandler.class); @@ -87,17 +85,6 @@ public class RTMPHandler extends BaseRTMPHandler implements DisposableBean { */ private boolean dispatchStreamActions; - @Override - public void destroy() throws Exception { - log.info("Shutting down handling"); - if (!recvDispatchExecutor.isTerminated()) { - List<Runnable> waiters = recvDispatchExecutor.shutdownNow(); - if (isDebug) { - log.debug("Tasks waiting at shutdown: {}", waiters); - } - } - } - /** * Setter for server object. * diff --git a/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java b/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java index bf611d8c81e96b0fd2a2e4e939abc77190c3ce65..076ca1f7f5ef9a626cc7bcf53052466674f37ec5 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java +++ b/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java @@ -7,17 +7,12 @@ package org.red5.server.net.rtmp; -import java.util.Date; +import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.red5.server.api.Red5; import org.red5.server.net.rtmp.message.Packet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.core.task.TaskRejectedException; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * Wraps processing of incoming messages. @@ -26,117 +21,50 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; */ public final class ReceivedMessageTask implements Callable<Packet> { - private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTask.class); - private final RTMPConnection conn; private final IRTMPHandler handler; - private final String sessionId; - - private Packet packet; - - private long packetNumber; + private final Packet packet; - private final AtomicBoolean processing = new AtomicBoolean(false); + private final int hashCode; - private Thread taskThread; + private AtomicBoolean processing = new AtomicBoolean(false); - private ScheduledFuture<Runnable> deadlockFuture; - - public ReceivedMessageTask(String sessionId, Packet packet, IRTMPHandler handler, RTMPConnection conn) { - this.sessionId = sessionId; - this.packet = packet; - this.handler = handler; + public ReceivedMessageTask(RTMPConnection conn, Packet packet) { this.conn = conn; + this.packet = packet; + this.handler = conn.getHandler(); + // generate hash code + hashCode = Objects.hash(conn.getSessionId(), packet); } public Packet call() throws Exception { - //keep a ref for executor thread - taskThread = Thread.currentThread(); - // set connection to thread local - Red5.setConnectionLocal(conn); - try { - // pass message to the handler - handler.messageReceived(conn, packet); - // if we get this far, set done / completed flag - packet.setProcessed(true); - } finally { - // clear thread local - Red5.setConnectionLocal(null); - } - if (log.isDebugEnabled()) { - log.debug("Processing message for {} is processed: {} packet #{}", sessionId, packet.isProcessed(), packetNumber); - } - return packet; - } - - /** - * Runs deadlock guard task - * - * @param deadlockGuardTask - * deadlock guard task - */ - @SuppressWarnings({ "unchecked", "null" }) - public void runDeadlockFuture(Runnable deadlockGuardTask) { - if (deadlockFuture == null) { - ThreadPoolTaskScheduler deadlockGuard = conn.getDeadlockGuardScheduler(); - if (deadlockGuard != null) { - try { - deadlockFuture = (ScheduledFuture<Runnable>) deadlockGuard.schedule(deadlockGuardTask, new Date(packet.getExpirationTime())); - } catch (TaskRejectedException e) { - log.warn("DeadlockGuard task is rejected for {}", sessionId, e); - } - } else { - log.debug("Deadlock guard is null for {}", sessionId); + if (processing.compareAndSet(false, true)) { + // set connection to thread local + Red5.setConnectionLocal(conn); + try { + // pass message to the handler + handler.messageReceived(conn, packet); + // if we get this far, set done / completed flag + packet.setProcessed(true); + } finally { + // clear thread local + Red5.setConnectionLocal(null); } } else { - log.warn("Deadlock future is already create for {}", sessionId); - } - } - - /** - * Cancels deadlock future if it was created - */ - public void cancelDeadlockFuture() { - // kill the future for the deadlock since processing is complete - if (deadlockFuture != null) { - deadlockFuture.cancel(true); + throw new IllegalStateException("Task is already being processed"); } - } - - /** - * Marks task as processing if it is not processing yet. - * - * @return true if successful, or false otherwise - */ - public boolean setProcessing() { - return processing.compareAndSet(false, true); - } - - public long getPacketNumber() { - return packetNumber; - } - - public void setPacketNumber(long packetNumber) { - this.packetNumber = packetNumber; + return packet; } public Packet getPacket() { return packet; } - public Thread getTaskThread() { - return taskThread; - } - @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); - result = prime * result + packet.getHeader().hashCode(); - return result; + return hashCode; } @Override @@ -148,11 +76,7 @@ public final class ReceivedMessageTask implements Callable<Packet> { if (getClass() != obj.getClass()) return false; ReceivedMessageTask other = (ReceivedMessageTask) obj; - if (sessionId == null) { - if (other.sessionId != null) { - return false; - } - } else if (!sessionId.equals(other.sessionId)) { + if (!this.equals(other)) { return false; } if (!packet.getHeader().equals(other.packet.getHeader())) { @@ -163,7 +87,7 @@ public final class ReceivedMessageTask implements Callable<Packet> { @Override public String toString() { - return "[sessionId: " + sessionId + "; packetNumber: " + packetNumber + "; processing: " + processing.get() + "]"; + return "[sessionId: " + conn.getSessionId() + ", processing: " + processing.get() + "]"; } } \ No newline at end of file diff --git a/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java b/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java deleted file mode 100644 index f959186ac8d9cdef5dfcc9a57a48ec0541cda051..0000000000000000000000000000000000000000 --- a/common/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * RED5 Open Source Media Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ - -package org.red5.server.net.rtmp; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.red5.server.net.rtmp.message.Packet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Contains queue of tasks for processing messages in the specified channel. Ensures that all messages which has got in channel will be processed sequentially. - * - * @author Maria Chabanets (m.e.platova@gmail.com) - */ -public class ReceivedMessageTaskQueue { - - private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTaskQueue.class); - - /** - * Stream id. - */ - private final int streamId; - - /** - * Task queue. - */ - private final Queue<ReceivedMessageTask> tasks = new ConcurrentLinkedQueue<ReceivedMessageTask>(); - - /** - * Listener which tries to process message from queue if queue has been changed. - */ - private final IReceivedMessageTaskQueueListener listener; - - public ReceivedMessageTaskQueue(int streamId, IReceivedMessageTaskQueueListener listener) { - this.streamId = streamId; - this.listener = listener; - } - - /** - * Adds new task to the end of the queue. - * - * @param task - * received message task - */ - public void addTask(ReceivedMessageTask task) { - tasks.add(task); - Packet packet = task.getPacket(); - // don't run the deadlock guard if timeout is <= 0 - if (packet.getExpirationTime() > 0L) { - // run a deadlock guard so hanging tasks will be interrupted - task.runDeadlockFuture(new DeadlockGuard(task)); - } - if (listener != null) { - listener.onTaskAdded(this); - } - } - - /** - * Removes the specified task from the queue. - * - * @param task - * received message task - */ - public void removeTask(ReceivedMessageTask task) { - if (tasks.remove(task)) { - task.cancelDeadlockFuture(); - if (listener != null) { - listener.onTaskRemoved(this); - } - } - } - - /** - * Gets first task from queue if it can be processed. If first task is already in process it returns null. - * - * @return task that can be processed or null otherwise - */ - public ReceivedMessageTask getTaskToProcess() { - ReceivedMessageTask task = tasks.peek(); - if (task != null && task.setProcessing()) { - return task; - } - return null; - } - - /** - * Removes all tasks from the queue. - */ - public void removeAllTasks() { - for (ReceivedMessageTask task : tasks) { - task.cancelDeadlockFuture(); - } - tasks.clear(); - } - - public int getStreamId() { - return streamId; - } - - /** - * Prevents deadlocked message handling. - */ - private class DeadlockGuard implements Runnable { - - private final ReceivedMessageTask task; - - /** - * Creates the deadlock guard to prevent a message task from taking too long to setProcessing. - * - * @param task - */ - private DeadlockGuard(ReceivedMessageTask task) { - this.task = task; - if (log.isTraceEnabled()) { - log.trace("DeadlockGuard is created for {}", task); - } - } - - /** - * Save the reference to the task, and wait until the maxHandlingTimeout has elapsed. If it elapsed, remove task and stop its thread. - * */ - public void run() { - Packet packet = task.getPacket(); - if (log.isTraceEnabled()) { - log.trace("DeadlockGuard is started for {}", task); - } - // skip processed packet - if (packet.isProcessed()) { - log.debug("DeadlockGuard skipping task for processed packet {}", task); - } else if (packet.isExpired()) { - // try to interrupt thread - log.debug("DeadlockGuard skipping task for expired packet {}", task); - } else { - // if the message task is not yet done or is not expired interrupt - // if the task thread hasn't been interrupted check its live-ness - // if the task thread is alive, interrupt it - Thread taskThread = task.getTaskThread(); - if (taskThread == null) { - log.debug("Task has not start yet {}", task); - } else if (!taskThread.isInterrupted() && taskThread.isAlive()) { - log.warn("Interrupting unfinished active task {}", task); - taskThread.interrupt(); - } else { - log.debug("Unfinished task {} already interrupted", task); - } - } - // remove this task from the queue in any case - removeTask(task); - } - } -} diff --git a/io/pom.xml b/io/pom.xml index 42bab11355c6c1d205014a0f82bb8529ad3c88b3..c97c00f29096c2f658e821cf8f306a635259de0b 100644 --- a/io/pom.xml +++ b/io/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-io</artifactId> diff --git a/pom.xml b/pom.xml index 275e7baf90777330755518159eca897be21db29d..d70d054201ac20ac8af5498a71f10b5c5b0f25d1 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ <name>Red5</name> <description>The Red5 server</description> <groupId>org.red5</groupId> - <version>1.3.28</version> + <version>1.3.29</version> <url>https://github.com/Red5/red5-server</url> <inceptionYear>2005</inceptionYear> <organization> diff --git a/server/pom.xml b/server/pom.xml index 271b782c6c7068c1aba40fd32d47479938335d7e..b7fb4bd4579b13faf0de7ade941a2ff31305c67a 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-server</artifactId> @@ -21,9 +21,6 @@ <plugin> <artifactId>maven-enforcer-plugin</artifactId> </plugin> - <plugin> - <artifactId>maven-toolchains-plugin</artifactId> - </plugin> <plugin> <groupId>net.revelc.code.formatter</groupId> <artifactId>formatter-maven-plugin</artifactId> diff --git a/server/src/main/server/conf/red5-core.xml b/server/src/main/server/conf/red5-core.xml index 0e178eed1aa886ad741c218ddf4bd89ec0993483..d8e2d04816d9bde6fd0d3a8bc81a8cd120e2c224 100644 --- a/server/src/main/server/conf/red5-core.xml +++ b/server/src/main/server/conf/red5-core.xml @@ -27,7 +27,6 @@ <property name="threadNamePrefix" value="RTMPConnectionScheduler-" /> </bean> -<!-- <bean id="messageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="${rtmp.executor.core_pool_size}" /> <property name="maxPoolSize" value="${rtmp.executor.max_pool_size}" /> @@ -36,14 +35,6 @@ <property name="waitForTasksToCompleteOnShutdown" value="true" /> <property name="threadNamePrefix" value="RTMPConnectionExecutor-" /> </bean> - - <bean id="deadlockGuardScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> - <property name="poolSize" value="${rtmp.deadlockguard.sheduler.pool_size}" /> - <property name="daemon" value="false" /> - <property name="waitForTasksToCompleteOnShutdown" value="true" /> - <property name="threadNamePrefix" value="DeadlockGuardScheduler-" /> - </bean> ---> <!-- RTMP connection manager --> <bean id="rtmpConnManager" class="org.red5.server.net.rtmp.RTMPConnManager" /> @@ -106,6 +97,8 @@ <bean id="rtmpMinaConnection" scope="prototype" class="org.red5.server.net.rtmp.RTMPMinaConnection"> <!-- Executor for scheduled tasks --> <property name="scheduler" ref="rtmpScheduler" /> + <!-- Executor for received tasks --> + <property name="executor" ref="messageExecutor" /> <!-- Ping clients every X ms. Set to 0 to disable ghost detection code. --> <property name="pingInterval" value="${rtmp.ping_interval}" /> <!-- Disconnect client after X ms of not responding. --> diff --git a/server/src/main/server/conf/red5.properties b/server/src/main/server/conf/red5.properties index 810dd2308e5fb9dda8020f8e70cb16a99eef7393..36242b41bdc02feef6f0143e227e77a79c0144a5 100644 --- a/server/src/main/server/conf/red5.properties +++ b/server/src/main/server/conf/red5.properties @@ -52,7 +52,7 @@ mina.logfilter.enable=false rtmp.scheduler.pool_size=8 rtmp.deadlockguard.sheduler.pool_size=8 # message executor configs (per application) - adjust these as needed if you get tasks rejected -rtmp.executor.core_pool_size=4 +rtmp.executor.core_pool_size=1 rtmp.executor.max_pool_size=32 rtmp.executor.queue_capacity=64 # drop audio packets when queue is almost full, to disable this, set to 0 diff --git a/service/pom.xml b/service/pom.xml index ff487b9e34c7da1a0a14f6ccb29c90a11ef2f2f1..523f848338424c923844f81fd2b09ce5ae8f6a81 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.28</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-service</artifactId> diff --git a/servlet/pom.xml b/servlet/pom.xml index 7b40ec06c5384b890173d628f2ab1c1654bd7fe4..4167e38f78b2b26ec44490dc63be437ca1d2f988 100644 --- a/servlet/pom.xml +++ b/servlet/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.2.30</version> + <version>1.3.29</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-servlet</artifactId>