diff --git a/client/pom.xml b/client/pom.xml index d95b59899f745b519000f2d7ff8e38c98d448d80..d5ade07d0582d91ecd28014f78aefab2cacbaaee 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-client</artifactId> diff --git a/client/src/main/java/org/red5/client/Red5Client.java b/client/src/main/java/org/red5/client/Red5Client.java index 5d9b3594e32753d07e9a83dc80b7b4c80792e6a0..46102692de1ad611ea3bb283429da8e63f2a31c0 100644 --- a/client/src/main/java/org/red5/client/Red5Client.java +++ b/client/src/main/java/org/red5/client/Red5Client.java @@ -18,7 +18,7 @@ public final class Red5Client { /** * Current server version with revision */ - public static final String VERSION = "Red5 Client 1.3.27"; + public static final String VERSION = "Red5 Client 1.3.28"; /** * Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope diff --git a/common/pom.xml b/common/pom.xml index c3d0e935f5d4c1ecff0282658b99eac3b3f120b4..0db367b127d83e44ce9b2a6f7bcb55109b40dfbb 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-server-common</artifactId> @@ -113,7 +113,7 @@ <dependency> <groupId>net.engio</groupId> <artifactId>mbassador</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </dependency> --> <dependency> <groupId>junit</groupId> diff --git a/common/src/main/java/org/red5/server/BaseConnection.java b/common/src/main/java/org/red5/server/BaseConnection.java index e2ff40e0b8efb56d91af189a1ba57699270a618e..70e84438c152314fdf30972f1485534f7537b44e 100644 --- a/common/src/main/java/org/red5/server/BaseConnection.java +++ b/common/src/main/java/org/red5/server/BaseConnection.java @@ -444,18 +444,18 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti public void close() { if (closed) { log.debug("Already closed, nothing to do"); - return; - } - closed = true; - // disconnect - disconnect(); - // alert our listeners - if (connectionListeners != null) { - for (IConnectionListener listener : connectionListeners) { - listener.notifyDisconnected(this); + } else { + closed = true; + // disconnect + disconnect(); + // alert our listeners + if (connectionListeners != null) { + for (IConnectionListener listener : connectionListeners) { + listener.notifyDisconnected(this); + } + connectionListeners.clear(); + connectionListeners = null; } - connectionListeners.clear(); - connectionListeners = null; } } diff --git a/common/src/main/java/org/red5/server/api/Red5.java b/common/src/main/java/org/red5/server/api/Red5.java index e5dae97200614423c704979599612b2c743cee8f..d42c9c0979892c5d5373827689ac6da7887e8b14 100644 --- a/common/src/main/java/org/red5/server/api/Red5.java +++ b/common/src/main/java/org/red5/server/api/Red5.java @@ -55,12 +55,12 @@ public final class Red5 { /** * Server version with revision */ - public static final String VERSION = "Red5 Server 1.3.27"; + public static final String VERSION = "Red5 Server 1.3.28"; /** * Server version for fmsVer requests */ - public static final String FMS_VERSION = "RED5/1,3,27,0"; + public static final String FMS_VERSION = "RED5/1,3,28,0"; /** * Server capabilities diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index c0a6371f6fa327cbe9048e6bb9e8945c3565f719..31005c3668338af482bf5d3f6b9d33dc01eac569 100755 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1510,7 +1511,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa do { try { // DTS appears to be off only by < 10ms - Packet p = receivedPacketQueue.take(); + Packet p = receivedPacketQueue.poll(10000L, TimeUnit.MILLISECONDS); // wait for a packet up to 10 seconds if (p != null) { if (isTrace) { log.trace("Handle received packet: {}", p); @@ -1518,10 +1519,12 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa // pass message to the handler where any sorting or delays would need to be injected handler.messageReceived(conn, p); } + } catch (InterruptedException e) { + log.debug("Interrupted while waiting for message {} state: {}", sessionId, RTMP.states[getStateCode()], e); } catch (Exception e) { log.error("Error processing received message {} state: {}", sessionId, RTMP.states[getStateCode()], e); } - } while (!isClosed()); + } while (state.getState() < RTMP.STATE_ERROR); // keep processing unless we pass the error state receivedPacketFuture = null; receivedPacketQueue.clear(); }); diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPMinaConnection.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPMinaConnection.java index ca492004d2b9319711cf8f6f0084740aa1d7199a..9bf57b184f20148ce5962ffd38fca00f42ce86cb 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPMinaConnection.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPMinaConnection.java @@ -124,7 +124,7 @@ public class RTMPMinaConnection extends RTMPConnection implements RTMPMinaConnec if (future.isClosed()) { log.info("Connection is closed: {}", getSessionId()); if (log.isTraceEnabled()) { - log.trace("Session id - local: {} session: {}", getSessionId(), (String) ioSession.removeAttribute(RTMPConnection.RTMP_SESSION_ID)); + log.trace("Session id - local: {} session: {}", getSessionId(), (String) ioSession.getAttribute(RTMPConnection.RTMP_SESSION_ID)); } handler.connectionClosed(self); } else { @@ -352,7 +352,7 @@ public class RTMPMinaConnection extends RTMPConnection implements RTMPMinaConnec //if (log.isTraceEnabled()) { // log.trace("Write lock wait count: {} closed: {}", lock.getQueueLength(), isClosed()); //} - while (!isClosed()) { + while (state.getState() < RTMP.STATE_ERROR) { boolean acquired = false; try { acquired = lock.tryAcquire(10, TimeUnit.MILLISECONDS); @@ -392,7 +392,7 @@ public class RTMPMinaConnection extends RTMPConnection implements RTMPMinaConnec public void writeRaw(IoBuffer out) { if (ioSession != null) { final Semaphore lock = getLock(); - while (!isClosed()) { + while (state.getState() < RTMP.STATE_ERROR) { boolean acquired = false; try { acquired = lock.tryAcquire(10, TimeUnit.MILLISECONDS); diff --git a/io/pom.xml b/io/pom.xml index c20c7f409702f338f1771bd4243c5957b7cfdd16..42bab11355c6c1d205014a0f82bb8529ad3c88b3 100644 --- a/io/pom.xml +++ b/io/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-io</artifactId> diff --git a/pom.xml b/pom.xml index 1d79b74e20ea330da851def9ad3749369628e34f..a1b94a154a8c98bdcf35b80c3137b89158053e3f 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ <name>Red5</name> <description>The Red5 server</description> <groupId>org.red5</groupId> - <version>1.3.27</version> + <version>1.3.28</version> <url>https://github.com/Red5/red5-server</url> <inceptionYear>2005</inceptionYear> <organization> @@ -86,8 +86,10 @@ </developers> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <!-- to run unit tests use -Dmaven.test.skip=false --> - <!-- <maven.test.skip>true</maven.test.skip> --> + <!-- dont skip building the tests --> + <maven.test.skip>false</maven.test.skip> + <!-- skip running the tests --> + <skipTests>true</skipTests> <maven.compiler.source>1.11</maven.compiler.source> <maven.compiler.target>1.11</maven.compiler.target> <java.release.level>11</java.release.level> diff --git a/server/pom.xml b/server/pom.xml index f198668bf7e8867ef139aa96494f041579e0d4a9..271b782c6c7068c1aba40fd32d47479938335d7e 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-server</artifactId> diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java index a991f5aa0d2a6bd0b78a07c67e5f4b2d420e573a..a609bdb81f5ff862e14140dc7409fd4984e55402 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java @@ -27,6 +27,7 @@ import java.util.stream.Stream; import javax.websocket.Extension; import javax.websocket.Session; +import javax.websocket.RemoteEndpoint.Basic; import org.apache.commons.lang3.StringUtils; import org.apache.tomcat.websocket.Constants; @@ -186,41 +187,37 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We } // process the incoming string if (StringUtils.isNotBlank(data)) { - WsSession session = wsSession.get(); - if (session != null && session.isOpen()) { - if (isConnected()) { - try { - if (useAsync) { - if (sendFuture != null && !sendFuture.isDone()) { - try { - sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - log.warn("Send timed out {}", wsSessionId); - if (!isConnected()) { - sendFuture.cancel(true); - return; - } + final WsSession session = wsSession.get(); + // attempt send only if the session is not closed + if (session != null && !session.isClosed()) { + try { + if (useAsync) { + if (sendFuture != null && !sendFuture.isDone()) { + try { + sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + log.warn("Send timed out {}", wsSessionId); + // if the session is not open, cancel the future + if (!session.isOpen()) { + sendFuture.cancel(true); + return; } } - synchronized (wsSessionId) { - int lengthToWrite = data.getBytes().length; - sendFuture = session.getAsyncRemote().sendText(data); - updateWriteBytes(lengthToWrite); - } - } else { - synchronized (wsSessionId) { - int lengthToWrite = data.getBytes().length; - session.getBasicRemote().sendText(data); - updateWriteBytes(lengthToWrite); - } } - } catch (Exception e) { - if (isConnected()) { - log.warn("Send text exception", e); + synchronized (wsSessionId) { + int lengthToWrite = data.getBytes().length; + sendFuture = session.getAsyncRemote().sendText(data); + updateWriteBytes(lengthToWrite); + } + } else { + synchronized (wsSessionId) { + int lengthToWrite = data.getBytes().length; + session.getBasicRemote().sendText(data); + updateWriteBytes(lengthToWrite); } } - } else { - throw new IOException("WS connection closed"); + } catch (Exception e) { + log.warn("Send text exception", e); } } else { throw new IOException("WS session closed"); diff --git a/service/pom.xml b/service/pom.xml index 309df39395d34783974d1f9d44240820cd0be6e8..ff487b9e34c7da1a0a14f6ccb29c90a11ef2f2f1 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -3,7 +3,7 @@ <parent> <groupId>org.red5</groupId> <artifactId>red5-parent</artifactId> - <version>1.3.27</version> + <version>1.3.28</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>red5-service</artifactId>