Skip to content
Snippets Groups Projects
Commit 8cf70e0c authored by Paul Gregoire's avatar Paul Gregoire
Browse files

Update version to 1.3.29. Refactor RTMP packet handling, remove extra...

Update version to 1.3.29. Refactor RTMP packet handling, remove extra threading, move handler into callable.
parent 0f11e606
Branches
No related tags found
No related merge requests found
Showing
with 139 additions and 658 deletions
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-client</artifactId>
......@@ -16,6 +16,9 @@
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.revelc.code.formatter</groupId>
<artifactId>formatter-maven-plugin</artifactId>
......@@ -43,15 +46,6 @@
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-release-plugin</artifactId>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<useReleaseProfile>false</useReleaseProfile>
<releaseProfiles>release</releaseProfiles>
<goals>deploy</goals>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
......
......@@ -18,7 +18,7 @@ public final class Red5Client {
/**
* Current server version with revision
*/
public static final String VERSION = "Red5 Client 1.3.28";
public static final String VERSION = "Red5 Client 1.3.29";
/**
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
......@@ -113,7 +113,7 @@
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</dependency> -->
<dependency>
<groupId>junit</groupId>
......
......@@ -57,12 +57,12 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.3.28";
public static final String VERSION = "Red5 Server 1.3.29";
/**
* Server version for fmsVer requests
*/
public static final String FMS_VERSION = "RED5/1,3,28,0";
public static final String FMS_VERSION = "RED5/1,3,29,0";
/**
* Server capabilities
......
......@@ -10,14 +10,9 @@ package org.red5.server.net.rtmp;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.core.session.IoSession;
import org.red5.io.object.StreamAction;
import org.red5.server.api.Red5;
import org.red5.server.api.event.IEventDispatcher;
import org.red5.server.api.service.IPendingServiceCall;
import org.red5.server.api.service.IPendingServiceCallback;
......@@ -58,9 +53,6 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
private static boolean isDebug = log.isDebugEnabled();
// thread pool for handling receive
protected final ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool();
/** {@inheritDoc} */
public void connectionOpened(RTMPConnection conn) {
if (isTrace) {
......@@ -106,20 +98,12 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
// NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
// the client sends a few stream packets before stopping; we need to ignore them.
if (stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
// dispatch to stream
((IEventDispatcher) stream).dispatchEvent(message);
// release / clean up
message.release();
}
break;
case TYPE_FLEX_SHARED_OBJECT:
case TYPE_SHARED_OBJECT:
onSharedObject(conn, channel, header, (SharedObjectMessage) message);
break;
case TYPE_INVOKE:
case TYPE_FLEX_MESSAGE:
onCommand(conn, channel, header, (Invoke) message);
......@@ -136,14 +120,10 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
if (((Notify) message).getData() != null && stream != null) {
// Stream metadata
if (stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
// dispatch to stream
((IEventDispatcher) stream).dispatchEvent(message);
// release / clean up
message.release();
}
} else {
onCommand(conn, channel, header, (Notify) message);
......@@ -152,6 +132,10 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
case TYPE_PING:
onPing(conn, channel, header, (Ping) message);
break;
case TYPE_FLEX_SHARED_OBJECT:
case TYPE_SHARED_OBJECT:
onSharedObject(conn, channel, header, (SharedObjectMessage) message);
break;
case TYPE_BYTES_READ:
onStreamBytesRead(conn, channel, header, (BytesRead) message);
break;
......@@ -191,18 +175,6 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
if (conn.getStateCode() != RTMP.STATE_DISCONNECTED) {
// inform any callbacks for pending calls that the connection is closed
conn.sendPendingServiceCallsCloseError();
// clean up / remove the packet execution attribute
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
if (epeo != null) {
epeo.shutdown();
}
if (conn.removeAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
log.debug("Removed packet execution attribute");
}
}
// shutdown the executor
recvDispatchExecutor.shutdownNow();
// close the connection
if (conn.getStateCode() != RTMP.STATE_DISCONNECTING) {
conn.close();
......@@ -381,112 +353,4 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status
*/
protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message);
/**
* Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
* and keeps all incoming events in order.
*/
private class EnsuresPacketExecutionOrder implements Runnable {
private final IEventDispatcher stream;
private final RTMPConnection conn;
private ConcurrentLinkedQueue<IRTMPEvent> events = new ConcurrentLinkedQueue<>();
private AtomicBoolean submitted = new AtomicBoolean();
private volatile String threadName;
private boolean shutdown;
public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) {
log.debug("Created for stream: {} connection: {}", stream, conn);
this.stream = stream;
this.conn = conn;
}
/**
* Shutdown and clean up.
*/
public void shutdown() {
log.debug("Shutdown; events: {}", events.size());
// set shutdown flag preventing further adds
shutdown = true;
// release all events
events.forEach(event -> {
event.release();
});
// clear the queue
events.clear();
}
/**
* Add packet to the stream's incoming queue.
*
* @param packet
*/
public void addPacket(IRTMPEvent packet) {
if (!shutdown) {
log.debug("addPacket: {}", packet);
// add to queue
events.offer(packet);
// if we are not already running, submit for execution
if (submitted.compareAndSet(false, true)) {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
log.debug("Submit: {}", threadName);
recvDispatchExecutor.submit(this);
}
} else {
log.debug("Shutdown, not adding packet");
}
}
@Override
public void run() {
// use int to identify different thread instance
Thread.currentThread().setName(threadName);
// always set connection local on dispatch threads
Red5.setConnectionLocal(conn);
try {
// we were created for a reason, grab the event; add short timeout just in case
IRTMPEvent packet = events.peek();
// null check just in case queue was drained before we woke
if (packet != null && events.remove(packet)) {
if (isDebug) {
log.debug("Taken packet: {}", packet);
}
// dispatch to stream
stream.dispatchEvent(packet);
// release / clean up
packet.release();
}
} catch (Exception e) {
log.warn("Exception polling for next message", e);
}
// set null before resubmit
Red5.setConnectionLocal(null);
// check for shutdown and then submit or resubmit
if (!shutdown) {
if (events.isEmpty()) {
log.debug("Queue is empty");
if (submitted.compareAndSet(true, false)) {
// false state will allow resubmit at the next add
log.debug("Allow new submit");
}
} else {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
if (isDebug) {
log.debug("Resubmit: {}", threadName);
}
// resubmitting rather than looping until empty plays nice with other threads
recvDispatchExecutor.submit(this);
}
} else {
log.debug("Shutdown, no more submits");
}
}
}
}
/*
* RED5 Open Source Media Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
* required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/
package org.red5.server.net.rtmp;
public interface IReceivedMessageTaskQueueListener {
void onTaskAdded(ReceivedMessageTaskQueue queue);
void onTaskRemoved(ReceivedMessageTaskQueue queue);
}
......@@ -29,6 +29,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.mina.core.buffer.IoBuffer;
......@@ -79,7 +80,6 @@ import org.red5.server.stream.SingleItemSubscriberStream;
import org.red5.server.stream.StreamService;
import org.red5.server.util.ScopeUtils;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
......@@ -90,7 +90,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* RTMP connection. Stores information about client streams, data transfer channels, pending RPC calls, bandwidth configuration, AMF
* encoding type (AMF0/AMF3), connection state (is alive, last ping time and ping result) and session.
*/
public abstract class RTMPConnection extends BaseConnection implements IStreamCapableConnection, IServiceCapableConnection, IReceivedMessageTaskQueueListener {
public abstract class RTMPConnection extends BaseConnection implements IStreamCapableConnection, IServiceCapableConnection {
public static final String RTMP_SESSION_ID = "rtmp.sessionid";
......@@ -102,8 +102,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
public static final Object RTMP_HANDLER = "rtmp.handler";
public final static String RTMP_EXECUTION_ORDERER = "rtmp.execution.orderer";
/**
* Marker byte for standard or non-encrypted RTMP data.
*/
......@@ -142,6 +140,11 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
// ~320 streams seems like a sufficient max amount of streams for a single connection
public static final double MAX_RESERVED_STREAMS = 320;
/**
* Updater for taskCount field.
*/
private static final AtomicIntegerFieldUpdater<RTMPConnection> receivedQueueSizeUpdater = AtomicIntegerFieldUpdater.newUpdater(RTMPConnection.class, "receivedQueueSize");
/**
* Initial channel capacity
*/
......@@ -189,13 +192,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*/
protected transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel);
/**
* Queues of tasks for every channel
*
* @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue
*/
protected final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
/**
* Client streams
*
......@@ -208,6 +204,11 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*/
protected transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel));
/**
* Received packet queue size
*/
protected volatile int receivedQueueSize;
/**
* Transaction identifier for remote commands.
*/
......@@ -329,11 +330,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*/
protected transient ThreadPoolTaskExecutor executor;
/**
* Thread pool for guarding deadlocks.
*/
protected transient ThreadPoolTaskScheduler deadlockGuardScheduler;
/**
* Keep-alive worker flag
*/
......@@ -354,16 +350,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*/
protected final AtomicLong packetSequence = new AtomicLong();
/**
* Specify the size of queue that will trigger audio packet dropping, disabled if it's 0
* */
private Integer executorQueueSizeToDropAudioPackets = 0;
/**
* Keep track of current queue size
* */
private final AtomicInteger currentQueueSize = new AtomicInteger();
/**
* Wait for handshake task.
*/
......@@ -385,9 +371,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
protected transient Future<?> receivedPacketFuture;
/**
* Queue for received RTMP packets.
* Queue for received RTMP packets. This is a transfer queue from which packets are passed to a handler.
*/
protected LinkedTransferQueue<Packet> receivedPacketQueue = new LinkedTransferQueue<>();
protected volatile LinkedTransferQueue<Packet> receivedPacketQueue = new LinkedTransferQueue<>();
/**
* Creates anonymous RTMP connection without scope.
......@@ -691,11 +677,6 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
log.trace("Channels: {}", channels);
}
}
/*
* ReceivedMessageTaskQueue queue = tasksByChannels.remove(channelId); if (queue != null) { if (isConnected()) { // if connected, drain and process the tasks queued-up
* log.debug("Processing remaining tasks at close for channel: {}", channelId); processTasksQueue(queue); } queue.removeAllTasks(); } else if (isTrace) {
* log.trace("No task queue for id: {}", channelId); }
*/
chan = null;
}
......@@ -1449,123 +1430,36 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
if (maxHandlingTimeout > 0) {
packet.setExpirationTime(System.currentTimeMillis() + maxHandlingTimeout);
}
if (executor != null) {
final byte dataType = packet.getHeader().getDataType();
// route these types outside the executor
switch (dataType) {
case Constants.TYPE_PING:
case Constants.TYPE_ABORT:
case Constants.TYPE_BYTES_READ:
case Constants.TYPE_CHUNK_SIZE:
case Constants.TYPE_CLIENT_BANDWIDTH:
case Constants.TYPE_SERVER_BANDWIDTH:
// pass message to the handler
try {
handler.messageReceived(this, packet);
} catch (Exception e) {
log.error("Error processing received message {}", sessionId, e);
}
break;
default:
final String messageType = getMessageType(packet);
try {
// increment the packet number
final long packetNumber = packetSequence.incrementAndGet();
if (executorQueueSizeToDropAudioPackets > 0 && currentQueueSize.get() >= executorQueueSizeToDropAudioPackets) {
if (packet.getHeader().getDataType() == Constants.TYPE_AUDIO_DATA) {
// if there's a backlog of messages in the queue. Flash might have sent a burst of messages after a network congestion. Throw away packets that we are able to discard.
log.info("Queue threshold reached. Discarding packet: session=[{}], msgType=[{}], packetNum=[{}]", sessionId, messageType, packetNumber);
return;
}
}
int streamId = packet.getHeader().getStreamId().intValue();
if (isTrace) {
log.trace("Handling message for streamId: {}, channelId: {} Channels: {}", streamId, packet.getHeader().getChannelId(), channels);
}
// create a task to setProcessing the message
ReceivedMessageTask task = new ReceivedMessageTask(sessionId, packet, handler, this);
task.setPacketNumber(packetNumber);
// create a task queue
ReceivedMessageTaskQueue newStreamTasks = new ReceivedMessageTaskQueue(streamId, this);
// put the queue in the task by stream map
ReceivedMessageTaskQueue currentStreamTasks = tasksByStreams.putIfAbsent(streamId, newStreamTasks);
if (currentStreamTasks != null) {
// add the task to the existing queue
currentStreamTasks.addTask(task);
} else {
// add the task to the newly created and just added queue
newStreamTasks.addTask(task);
}
} catch (Exception e) {
log.error("Incoming message handling failed on session=[" + sessionId + "], messageType=[" + messageType + "]", e);
if (isDebug) {
log.debug("Execution rejected on {} - {}", sessionId, RTMP.states[getStateCode()]);
log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits());
}
}
}
} else {
//if (isTrace) {
// log.trace("Executor is null on {} state: {}", sessionId, RTMP.states[getStateCode()]);
//}
// queue the packet
receivedPacketQueue.offer(packet);
if (receivedPacketQueue.offer(packet)) {
// increment the queue size
receivedQueueSizeUpdater.incrementAndGet(this);
}
// create the future for processing the queue as needed
if (receivedPacketFuture == null) {
final RTMPConnection conn = this;
receivedPacketFuture = receivedPacketExecutor.submit(() -> {
Thread.currentThread().setName(String.format("RTMPRecv@%s", sessionId));
do {
try {
do {
// DTS appears to be off only by < 10ms
Packet p = receivedPacketQueue.poll(maxPollTimeout, TimeUnit.MILLISECONDS); // wait for a packet up to 10 seconds
if (p != null) {
if (isTrace) {
log.trace("Handle received packet: {}", p);
}
// pass message to the handler where any sorting or delays would need to be injected
handler.messageReceived(conn, p);
}
} catch (InterruptedException e) {
log.debug("Interrupted while waiting for message {} state: {}", sessionId, RTMP.states[getStateCode()], e);
} catch (Exception e) {
log.error("Error processing received message {} state: {}", sessionId, RTMP.states[getStateCode()], e);
}
} while (state.getState() < RTMP.STATE_ERROR); // keep processing unless we pass the error state
receivedPacketFuture = null;
receivedPacketQueue.clear();
});
}
}
}
@Override
public void onTaskAdded(ReceivedMessageTaskQueue queue) {
currentQueueSize.incrementAndGet();
processTasksQueue(queue);
}
@Override
public void onTaskRemoved(ReceivedMessageTaskQueue queue) {
currentQueueSize.decrementAndGet();
processTasksQueue(queue);
}
@SuppressWarnings("unchecked")
private void processTasksQueue(final ReceivedMessageTaskQueue currentStreamTasks) {
int streamId = currentStreamTasks.getStreamId();
if (isTrace) {
log.trace("Process tasks for streamId {}", streamId);
}
final ReceivedMessageTask task = currentStreamTasks.getTaskToProcess();
if (task != null) {
Packet packet = task.getPacket();
// decrement the queue size
receivedQueueSizeUpdater.decrementAndGet(this);
// call directly to the handler
//handler.messageReceived(conn, p);
// create a task to handle the packet
ReceivedMessageTask task = new ReceivedMessageTask(conn, p);
try {
final String messageType = getMessageType(packet);
@SuppressWarnings("unchecked")
ListenableFuture<Packet> future = (ListenableFuture<Packet>) executor.submitListenable(new ListenableFutureTask<Packet>(task));
future.addCallback(new ListenableFutureCallback<Packet>() {
final long startTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
int getProcessingTime() {
return (int) (System.currentTimeMillis() - startTime);
......@@ -1573,42 +1467,39 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
@SuppressWarnings("null")
public void onFailure(Throwable t) {
log.debug("ReceivedMessageTask failure: {}", t);
if (log.isWarnEnabled()) {
log.warn("onFailure - session: {}, msgtype: {}, processingTime: {}, packetNum: {}", sessionId, messageType, getProcessingTime(), task.getPacketNumber());
}
currentStreamTasks.removeTask(task);
log.warn("onFailure - processingTime: {} msgtype: {} task: {}", getProcessingTime(), getMessageType(packet), task);
}
public void onSuccess(@Nullable
Packet packet) {
log.debug("ReceivedMessageTask success");
if (isDebug) {
log.debug("onSuccess - session: {}, msgType: {}, processingTime: {}, packetNum: {}", sessionId, messageType, getProcessingTime(), task.getPacketNumber());
}
currentStreamTasks.removeTask(task);
@SuppressWarnings("null")
public void onSuccess(Packet packet) {
log.debug("onSuccess - processingTime: {} msgtype: {} task: {}", getProcessingTime(), getMessageType(packet), task);
}
});
} catch (TaskRejectedException tre) {
Throwable[] suppressed = tre.getSuppressed();
for (Throwable t : suppressed) {
log.warn("Suppressed exception on {}", sessionId, t);
log.warn("Suppressed exception on {}", task, t);
}
log.info("Rejected message: {} on {}", packet, sessionId);
currentStreamTasks.removeTask(task);
log.info("Rejected task: {}", task);
} catch (Throwable e) {
log.error("Incoming message handling failed on session=[" + sessionId + "]", e);
log.error("Incoming message failed task: {}", task, e);
if (isDebug) {
log.debug("Execution rejected on {} - {}", getSessionId(), RTMP.states[getStateCode()]);
log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits());
}
currentStreamTasks.removeTask(task);
}
} else {
if (isTrace) {
log.trace("Channel {} task queue is empty", streamId);
}
} while (state.getState() < RTMP.STATE_ERROR); // keep processing unless we pass the error state
} catch (InterruptedException e) {
log.debug("Interrupted while waiting for message {} state: {}", sessionId, RTMP.states[getStateCode()], e);
} catch (Exception e) {
log.error("Error processing received message {} state: {}", sessionId, RTMP.states[getStateCode()], e);
} finally {
receivedPacketFuture = null;
receivedPacketQueue.clear();
}
});
}
}
......@@ -1654,7 +1545,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
* @return current message queue size
*/
protected int currentQueueSize() {
return currentQueueSize.get();
return receivedQueueSize;
}
/** {@inheritDoc} */
......@@ -1809,8 +1700,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*
* @return the deadlockGuardScheduler
*/
@Deprecated(since = "1.3.29", forRemoval = true)
public ThreadPoolTaskScheduler getDeadlockGuardScheduler() {
return deadlockGuardScheduler;
return null;
}
/**
......@@ -1819,8 +1711,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
* @param deadlockGuardScheduler
* the deadlockGuardScheduler to set
*/
@Deprecated(since = "1.3.29", forRemoval = true)
public void setDeadlockGuardScheduler(ThreadPoolTaskScheduler deadlockGuardScheduler) {
this.deadlockGuardScheduler = deadlockGuardScheduler;
// unused
}
/**
......@@ -1943,8 +1836,9 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
* @param executorQueueSizeToDropAudioPackets
* queue size
*/
@Deprecated(since = "1.3.29", forRemoval = true)
public void setExecutorQueueSizeToDropAudioPackets(Integer executorQueueSizeToDropAudioPackets) {
this.executorQueueSizeToDropAudioPackets = executorQueueSizeToDropAudioPackets;
// unused
}
@Override
......
......@@ -8,7 +8,6 @@
package org.red5.server.net.rtmp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.red5.io.object.StreamAction;
......@@ -54,12 +53,11 @@ import org.red5.server.so.SharedObjectService;
import org.red5.server.stream.StreamService;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
/**
* RTMP events handler.
*/
public class RTMPHandler extends BaseRTMPHandler implements DisposableBean {
public class RTMPHandler extends BaseRTMPHandler {
protected static Logger log = Red5LoggerFactory.getLogger(RTMPHandler.class);
......@@ -87,17 +85,6 @@ public class RTMPHandler extends BaseRTMPHandler implements DisposableBean {
*/
private boolean dispatchStreamActions;
@Override
public void destroy() throws Exception {
log.info("Shutting down handling");
if (!recvDispatchExecutor.isTerminated()) {
List<Runnable> waiters = recvDispatchExecutor.shutdownNow();
if (isDebug) {
log.debug("Tasks waiting at shutdown: {}", waiters);
}
}
}
/**
* Setter for server object.
*
......
......@@ -7,17 +7,12 @@
package org.red5.server.net.rtmp;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.red5.server.api.Red5;
import org.red5.server.net.rtmp.message.Packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* Wraps processing of incoming messages.
......@@ -26,34 +21,26 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
*/
public final class ReceivedMessageTask implements Callable<Packet> {
private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTask.class);
private final RTMPConnection conn;
private final IRTMPHandler handler;
private final String sessionId;
private Packet packet;
private long packetNumber;
private final Packet packet;
private final AtomicBoolean processing = new AtomicBoolean(false);
private final int hashCode;
private Thread taskThread;
private AtomicBoolean processing = new AtomicBoolean(false);
private ScheduledFuture<Runnable> deadlockFuture;
public ReceivedMessageTask(String sessionId, Packet packet, IRTMPHandler handler, RTMPConnection conn) {
this.sessionId = sessionId;
this.packet = packet;
this.handler = handler;
public ReceivedMessageTask(RTMPConnection conn, Packet packet) {
this.conn = conn;
this.packet = packet;
this.handler = conn.getHandler();
// generate hash code
hashCode = Objects.hash(conn.getSessionId(), packet);
}
public Packet call() throws Exception {
//keep a ref for executor thread
taskThread = Thread.currentThread();
if (processing.compareAndSet(false, true)) {
// set connection to thread local
Red5.setConnectionLocal(conn);
try {
......@@ -65,78 +52,19 @@ public final class ReceivedMessageTask implements Callable<Packet> {
// clear thread local
Red5.setConnectionLocal(null);
}
if (log.isDebugEnabled()) {
log.debug("Processing message for {} is processed: {} packet #{}", sessionId, packet.isProcessed(), packetNumber);
}
return packet;
}
/**
* Runs deadlock guard task
*
* @param deadlockGuardTask
* deadlock guard task
*/
@SuppressWarnings({ "unchecked", "null" })
public void runDeadlockFuture(Runnable deadlockGuardTask) {
if (deadlockFuture == null) {
ThreadPoolTaskScheduler deadlockGuard = conn.getDeadlockGuardScheduler();
if (deadlockGuard != null) {
try {
deadlockFuture = (ScheduledFuture<Runnable>) deadlockGuard.schedule(deadlockGuardTask, new Date(packet.getExpirationTime()));
} catch (TaskRejectedException e) {
log.warn("DeadlockGuard task is rejected for {}", sessionId, e);
}
} else {
log.debug("Deadlock guard is null for {}", sessionId);
}
} else {
log.warn("Deadlock future is already create for {}", sessionId);
}
throw new IllegalStateException("Task is already being processed");
}
/**
* Cancels deadlock future if it was created
*/
public void cancelDeadlockFuture() {
// kill the future for the deadlock since processing is complete
if (deadlockFuture != null) {
deadlockFuture.cancel(true);
}
}
/**
* Marks task as processing if it is not processing yet.
*
* @return true if successful, or false otherwise
*/
public boolean setProcessing() {
return processing.compareAndSet(false, true);
}
public long getPacketNumber() {
return packetNumber;
}
public void setPacketNumber(long packetNumber) {
this.packetNumber = packetNumber;
return packet;
}
public Packet getPacket() {
return packet;
}
public Thread getTaskThread() {
return taskThread;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode());
result = prime * result + packet.getHeader().hashCode();
return result;
return hashCode;
}
@Override
......@@ -148,11 +76,7 @@ public final class ReceivedMessageTask implements Callable<Packet> {
if (getClass() != obj.getClass())
return false;
ReceivedMessageTask other = (ReceivedMessageTask) obj;
if (sessionId == null) {
if (other.sessionId != null) {
return false;
}
} else if (!sessionId.equals(other.sessionId)) {
if (!this.equals(other)) {
return false;
}
if (!packet.getHeader().equals(other.packet.getHeader())) {
......@@ -163,7 +87,7 @@ public final class ReceivedMessageTask implements Callable<Packet> {
@Override
public String toString() {
return "[sessionId: " + sessionId + "; packetNumber: " + packetNumber + "; processing: " + processing.get() + "]";
return "[sessionId: " + conn.getSessionId() + ", processing: " + processing.get() + "]";
}
}
\ No newline at end of file
/*
* RED5 Open Source Media Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
* required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/
package org.red5.server.net.rtmp;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.red5.server.net.rtmp.message.Packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Contains queue of tasks for processing messages in the specified channel. Ensures that all messages which has got in channel will be processed sequentially.
*
* @author Maria Chabanets (m.e.platova@gmail.com)
*/
public class ReceivedMessageTaskQueue {
private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTaskQueue.class);
/**
* Stream id.
*/
private final int streamId;
/**
* Task queue.
*/
private final Queue<ReceivedMessageTask> tasks = new ConcurrentLinkedQueue<ReceivedMessageTask>();
/**
* Listener which tries to process message from queue if queue has been changed.
*/
private final IReceivedMessageTaskQueueListener listener;
public ReceivedMessageTaskQueue(int streamId, IReceivedMessageTaskQueueListener listener) {
this.streamId = streamId;
this.listener = listener;
}
/**
* Adds new task to the end of the queue.
*
* @param task
* received message task
*/
public void addTask(ReceivedMessageTask task) {
tasks.add(task);
Packet packet = task.getPacket();
// don't run the deadlock guard if timeout is <= 0
if (packet.getExpirationTime() > 0L) {
// run a deadlock guard so hanging tasks will be interrupted
task.runDeadlockFuture(new DeadlockGuard(task));
}
if (listener != null) {
listener.onTaskAdded(this);
}
}
/**
* Removes the specified task from the queue.
*
* @param task
* received message task
*/
public void removeTask(ReceivedMessageTask task) {
if (tasks.remove(task)) {
task.cancelDeadlockFuture();
if (listener != null) {
listener.onTaskRemoved(this);
}
}
}
/**
* Gets first task from queue if it can be processed. If first task is already in process it returns null.
*
* @return task that can be processed or null otherwise
*/
public ReceivedMessageTask getTaskToProcess() {
ReceivedMessageTask task = tasks.peek();
if (task != null && task.setProcessing()) {
return task;
}
return null;
}
/**
* Removes all tasks from the queue.
*/
public void removeAllTasks() {
for (ReceivedMessageTask task : tasks) {
task.cancelDeadlockFuture();
}
tasks.clear();
}
public int getStreamId() {
return streamId;
}
/**
* Prevents deadlocked message handling.
*/
private class DeadlockGuard implements Runnable {
private final ReceivedMessageTask task;
/**
* Creates the deadlock guard to prevent a message task from taking too long to setProcessing.
*
* @param task
*/
private DeadlockGuard(ReceivedMessageTask task) {
this.task = task;
if (log.isTraceEnabled()) {
log.trace("DeadlockGuard is created for {}", task);
}
}
/**
* Save the reference to the task, and wait until the maxHandlingTimeout has elapsed. If it elapsed, remove task and stop its thread.
* */
public void run() {
Packet packet = task.getPacket();
if (log.isTraceEnabled()) {
log.trace("DeadlockGuard is started for {}", task);
}
// skip processed packet
if (packet.isProcessed()) {
log.debug("DeadlockGuard skipping task for processed packet {}", task);
} else if (packet.isExpired()) {
// try to interrupt thread
log.debug("DeadlockGuard skipping task for expired packet {}", task);
} else {
// if the message task is not yet done or is not expired interrupt
// if the task thread hasn't been interrupted check its live-ness
// if the task thread is alive, interrupt it
Thread taskThread = task.getTaskThread();
if (taskThread == null) {
log.debug("Task has not start yet {}", task);
} else if (!taskThread.isInterrupted() && taskThread.isAlive()) {
log.warn("Interrupting unfinished active task {}", task);
taskThread.interrupt();
} else {
log.debug("Unfinished task {} already interrupted", task);
}
}
// remove this task from the queue in any case
removeTask(task);
}
}
}
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-io</artifactId>
......
......@@ -24,7 +24,7 @@
<name>Red5</name>
<description>The Red5 server</description>
<groupId>org.red5</groupId>
<version>1.3.28</version>
<version>1.3.29</version>
<url>https://github.com/Red5/red5-server</url>
<inceptionYear>2005</inceptionYear>
<organization>
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server</artifactId>
......@@ -21,9 +21,6 @@
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-toolchains-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.revelc.code.formatter</groupId>
<artifactId>formatter-maven-plugin</artifactId>
......
......@@ -27,7 +27,6 @@
<property name="threadNamePrefix" value="RTMPConnectionScheduler-" />
</bean>
<!--
<bean id="messageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${rtmp.executor.core_pool_size}" />
<property name="maxPoolSize" value="${rtmp.executor.max_pool_size}" />
......@@ -37,14 +36,6 @@
<property name="threadNamePrefix" value="RTMPConnectionExecutor-" />
</bean>
<bean id="deadlockGuardScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="${rtmp.deadlockguard.sheduler.pool_size}" />
<property name="daemon" value="false" />
<property name="waitForTasksToCompleteOnShutdown" value="true" />
<property name="threadNamePrefix" value="DeadlockGuardScheduler-" />
</bean>
-->
<!-- RTMP connection manager -->
<bean id="rtmpConnManager" class="org.red5.server.net.rtmp.RTMPConnManager" />
......@@ -106,6 +97,8 @@
<bean id="rtmpMinaConnection" scope="prototype" class="org.red5.server.net.rtmp.RTMPMinaConnection">
<!-- Executor for scheduled tasks -->
<property name="scheduler" ref="rtmpScheduler" />
<!-- Executor for received tasks -->
<property name="executor" ref="messageExecutor" />
<!-- Ping clients every X ms. Set to 0 to disable ghost detection code. -->
<property name="pingInterval" value="${rtmp.ping_interval}" />
<!-- Disconnect client after X ms of not responding. -->
......
......@@ -52,7 +52,7 @@ mina.logfilter.enable=false
rtmp.scheduler.pool_size=8
rtmp.deadlockguard.sheduler.pool_size=8
# message executor configs (per application) - adjust these as needed if you get tasks rejected
rtmp.executor.core_pool_size=4
rtmp.executor.core_pool_size=1
rtmp.executor.max_pool_size=32
rtmp.executor.queue_capacity=64
# drop audio packets when queue is almost full, to disable this, set to 0
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.28</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-service</artifactId>
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.2.30</version>
<version>1.3.29</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-servlet</artifactId>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment