diff --git a/common/pom.xml b/common/pom.xml index c6421d1310630887ee88272dd91293ef354b716f..594d9aa52c9cb11e74287173a497ad9da3ca020a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -10,9 +10,6 @@ <name>Red5 :: Server Common</name> <description>Classes common for multiple red5 projects</description> <packaging>jar</packaging> - <properties> - <maven.test.skip>true</maven.test.skip> - </properties> <build> <defaultGoal>install</defaultGoal> <plugins> diff --git a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index 6e44ca95fbbe0a4989d6e262ae3be87ff71f5676..147d5fd73fbe3ae9a26a65f951005c7e62fafa2e 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -117,7 +117,10 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status case TYPE_NOTIFY: // like an invoke, but does not return anything and has a invoke / transaction id of 0 case TYPE_FLEX_STREAM_SEND: - if (((Notify) message).getData() != null && stream != null) { + Notify notify = (Notify) message; + if (notify.getCall() != null) { + onCommand(conn, channel, header, notify); + } else { // Stream metadata if (stream != null) { // dispatch to stream @@ -125,8 +128,6 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status // release / clean up message.release(); } - } else { - onCommand(conn, channel, header, (Notify) message); } break; case TYPE_PING: diff --git a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index c2ceefa17576c897ade9ec39e0458c56bf7271d1..453c5f8045ec36e82f73010cb45fff90c499beff 100755 --- a/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -9,6 +9,7 @@ package org.red5.server.net.rtmp; import java.beans.ConstructorProperties; import java.beans.PropertyChangeEvent; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -524,7 +525,14 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa // start the handshake checker after maxHandshakeTimeout milliseconds if (scheduler != null) { try { - waitForHandshakeTask = scheduler.schedule(new WaitForHandshakeTask(), new Date(System.currentTimeMillis() + maxHandshakeTimeout)); + Date endDate = Date.from(Instant.now().plusMillis(maxHandshakeTimeout)); + if (isDebug) { + log.debug("Handshake timeout at: {}", endDate); + } + if (endDate != null) { + // schedule the task (if not already running + waitForHandshakeTask = scheduler.schedule(new WaitForHandshakeTask(), endDate); + } } catch (TaskRejectedException e) { if (isDebug) { log.warn("WaitForHandshake task was rejected for {}", sessionId, e); @@ -557,7 +565,13 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa } try { // schedule with an initial delay of now + 2s to prevent ping messages during connect post processes - keepAliveTask = scheduler.scheduleWithFixedDelay(new KeepAliveTask(), new Date(System.currentTimeMillis() + 2000L), pingInterval); + Date delayUntilDate = Date.from(Instant.now().plusMillis(2000)); + if (isDebug) { + log.debug("Keep alive delayed until: {}", delayUntilDate); + } + if (delayUntilDate != null) { + keepAliveTask = scheduler.scheduleWithFixedDelay(new KeepAliveTask(), delayUntilDate, pingInterval); + } if (isDebug) { log.debug("Keep alive scheduled for {}", sessionId); } @@ -1939,7 +1953,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa public void run() { if (isTrace) { - log.trace("WaitForHandshakeTask started for {}", getSessionId()); + log.trace("WaitForHandshakeTask started for {} at {}", getSessionId(), Instant.now()); } // check for connected state before disconnecting if (state.getState() != RTMP.STATE_CONNECTED) { diff --git a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java index da4bfc8c24b4f41478a1a47a808990db95f04df9..6cb7336b866d8d9bea7b706cd67df5998b8dda1a 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java +++ b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java @@ -896,7 +896,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { if (encoding == Encoding.AMF3) { log.trace("Client indicates its using AMF3"); } - //get the first datatype + // get the first datatype byte dataType = input.readDataType(); log.debug("Data type: {}", dataType); if (dataType == DataTypes.CORE_STRING) { diff --git a/io/src/main/java/org/red5/io/matroska/ParserUtils.java b/io/src/main/java/org/red5/io/matroska/ParserUtils.java index 89dfbc6ace43b5636a616e93153027f29e35ef79..7ed04bd54e2304ab262a9e3b148781e94b9f80f3 100644 --- a/io/src/main/java/org/red5/io/matroska/ParserUtils.java +++ b/io/src/main/java/org/red5/io/matroska/ParserUtils.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParserUtils { - + private static Logger log = LoggerFactory.getLogger(ParserUtils.class); public static final int BIT_IN_BYTE = 8; diff --git a/io/src/main/java/org/red5/io/matroska/dtd/TagFactory.java b/io/src/main/java/org/red5/io/matroska/dtd/TagFactory.java index 32d48d2a7a83dd60d438057d0efc6ccfaf73b652..2673403deb5e1c98988feb11b0de8b0b15f81860 100644 --- a/io/src/main/java/org/red5/io/matroska/dtd/TagFactory.java +++ b/io/src/main/java/org/red5/io/matroska/dtd/TagFactory.java @@ -19,8 +19,8 @@ import org.slf4j.LoggerFactory; /** * https://www.matroska.org/technical/tagging.html - * - * factory for creating matroska tags, it use property file - matroska_type_definition_config.properties with structure: + * + * factory for creating matroska tags, it use property file - matroska_type_definition_config.properties with structure: * long id = "name provided specification","java class representing tag data" */ public class TagFactory { @@ -58,7 +58,7 @@ public class TagFactory { tag = (Tag) nt.clazz.getConstructor(String.class, VINT.class, VINT.class, InputStream.class).newInstance(nt.name, id, size, inputStream); } catch (Exception e) { log.error("Unexpected exception while creating tag", e); - } + } } else { log.info("Unsupported matroska tag: {} {}", id, id.getBinary()); //throw new ConverterException("not supported matroska tag: " + id.getBinary()); diff --git a/io/src/main/java/org/red5/io/object/Deserializer.java b/io/src/main/java/org/red5/io/object/Deserializer.java index a9a870057ecdc110b236afe2ecb5272e92dfb9b9..c1533fd8b52c5969c46a53b7617326bf8077c926 100644 --- a/io/src/main/java/org/red5/io/object/Deserializer.java +++ b/io/src/main/java/org/red5/io/object/Deserializer.java @@ -29,7 +29,7 @@ public class Deserializer { private static final Logger log = LoggerFactory.getLogger(Deserializer.class); - private static Set<String> BLACK_LIST; + private static Set<String> BLACK_LIST = Collections.emptySet(); private Deserializer() { } diff --git a/io/src/test/java/org/red5/io/webm/WebmTest.java b/io/src/test/java/org/red5/io/webm/WebmTest.java index cbf77042c0dff0e5267569582c318a88f800b0a0..9fbf028554d7850bc09de4dc1f4580ad09d51328 100644 --- a/io/src/test/java/org/red5/io/webm/WebmTest.java +++ b/io/src/test/java/org/red5/io/webm/WebmTest.java @@ -9,7 +9,6 @@ package org.red5.io.webm; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.FileInputStream; diff --git a/server/src/main/java/org/red5/server/net/rtmpe/RTMPEIoFilter.java b/server/src/main/java/org/red5/server/net/rtmpe/RTMPEIoFilter.java index 24e6efa0c6fa100d0310010eb856bbfacd9927a1..6c0eb413a0752ca7498e9d047395caeecbc3af84 100644 --- a/server/src/main/java/org/red5/server/net/rtmpe/RTMPEIoFilter.java +++ b/server/src/main/java/org/red5/server/net/rtmpe/RTMPEIoFilter.java @@ -15,6 +15,7 @@ import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.filterchain.IoFilterAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRequest; +import org.bouncycastle.util.encoders.Hex; import org.red5.server.net.rtmp.InboundHandshake; import org.red5.server.net.rtmp.RTMPConnManager; import org.red5.server.net.rtmp.RTMPConnection; @@ -144,7 +145,10 @@ public class RTMPEIoFilter extends IoFilterAdapter { IoBuffer message = buffer.getBufferAsIoBuffer(); // assuming majority of connections will not be encrypted if (!((RTMPConnection) conn).isEncrypted()) { - log.trace("Receiving message: {}", message); + if (isTrace) { + //log.trace("Receiving message: {}", message); + log.trace(sessionId + " Receiving message: {}", Hex.toHexString(message.array(), message.arrayOffset(), message.remaining())); + } nextFilter.messageReceived(session, message); } else { Cipher cipher = (Cipher) session.getAttribute(RTMPConnection.RTMPE_CIPHER_IN); diff --git a/common/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java b/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java similarity index 96% rename from common/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java rename to server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java index 704db4cc12526a16773962fb244738c929c904c9..b06415f1ad1cd3da1b8b1da29b0ff5cf8819f44f 100644 --- a/common/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java +++ b/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java @@ -7,18 +7,21 @@ import static org.junit.Assert.assertNotNull; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import java.util.Arrays; import java.util.List; +import org.apache.commons.codec.binary.Hex; import org.apache.mina.core.buffer.IoBuffer; -import org.junit.After; -import org.junit.Before; import org.junit.Test; +import org.red5.io.amf.Output; import org.red5.io.utils.IOUtils; +import org.red5.server.api.Red5; import org.red5.server.net.rtmp.IRTMPHandler; import org.red5.server.net.rtmp.RTMPConnection; import org.red5.server.net.rtmp.RTMPMinaConnection; import org.red5.server.net.rtmp.RTMPUtils; import org.red5.server.net.rtmp.event.Invoke; +import org.red5.server.net.rtmp.event.Notify; import org.red5.server.net.rtmp.message.ChunkHeader; import org.red5.server.net.rtmp.message.Header; import org.red5.server.net.rtmp.message.Packet; @@ -29,14 +32,6 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { protected Logger log = LoggerFactory.getLogger(TestRTMPProtocolDecoder.class); - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - @Test public void testDecodeChannelId() { { @@ -249,15 +244,65 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { //byte[] buf = IOUtils.hexStringToByteArray("c62bc8fb0ff2f4705368de8fc45af6bca8568b90ff593764ac332775fef77e8827b76edfc9d8e2ff9fe22cbb7c94c5fb0b4130970474bf74be4af768961f05193e6ebd7bc5ddf76d597bd672f788d1affdf94c5709d2e8bae6d80bd425a2e719f2b2747da1390d5230f7f8bebd17baa50ac98539bd7b11ea977c57a482f97261f9c622d499fb9bd3e2b8a24b9b957abeb57df6b94a12b36983d02f242740e8bb3f8cb2fb540d1d8767f4fb2ba4f7c13d7e5b7fa115fb03b4aab5e6f197c9ee228a6ebd14177aafeac75dfad6ae621ba0fe27d57f5d2ae4ecfb7f1ddab0b47a0130adbb27617af93df5826a63eef7e8b2a549736b5f175f7d49f9b7bf2f860b5ea50cc679ab9bbd4849397d49424debe4af2cbfbf55be6d72505fc9beb9affbacb5df5f97bc11f378c08276239d210407bf9b6b525f5eff37bf93dfc57635d56a27135eb588e5efd85258cd565dfb27aa975b1745f5f97af2eb9377e796ff5f9b5c977ee08a9847543230beba557fdf15ef7fb1fbfddddfa43bdfbeefa4ebd5a939b7fdc6efec8effcbea6eeabfcbcdd9edd427a5bf5b2597eaad97d7576baaf2e6e4af043a36515bec83f41bef74ff146f77ebbdff11ef7f4fd16ff423a45d3c4f2f5be6f5e325ca6ec0fe4a2fb752a9765d17994cc7eeb89f457c10719ee5fde780adfebf5cd7ea8827beff93df90bdeb8bdfbbbf6ef7fab3e5aff09dfbde2737c5d7afc65c9ebdc9cdbfe276be83f047d80fdf095fa5f979bf37aed572760725b932539abe5d0587b93d0fcf541ffeeb2f72683fdf7f9bbfcd7fe5af522fbbfd93c4137cb853c74dbae5f5e5aa42ba0f61fad66ae4bf57aacd46f156d19b157edec61ed1ea9ff7989bfeeff526fe989eeeffcdbfbfcdaeb88ddeeffbdf788e2882fdf7e7c637fcf5cddeacc6efe4932baad7a096c3fa7cb542391794b41ff2f4ee61c4dacd96af2e8dbe1ce6eebfeeb35d309f7efd17e23a2ef7413acc6d7f09eaabd7955ab2f3164b37bf57852b6ebd595faed551bd7cbdb9615f63af15045c9641c85707b1fc84f5722f92ffc21dfba0aff84bbdf928475e4fdebf8ad7d7c95cb7f8a92bc658edfc9a0fe5e717f7b79b527820e6f37f13d6431c754aad75ed8fd81ef607415de97f3d457ad6fe2fd57f8434eabd97f7bf924a7d59fe5efe79b11bb32fab496249eafabfcdee49f139bcd0e708d79dbce7cc0803d2f554abfc9ed6d1341a0fb5b5314bbeb293df96b97d3f37af84e87d817f7be491bedfcbbdf4ab9bd12937c6422498d520c8ff0ee9e88b8ffe6f7ea4d7cddd3f0968bd00fe2af777f649d79cb4fbbfcd5b0fe5d555727be8f597c9d1fa5d1bd1287f64a995512690979a3edc31cde4101eab2cb4c99460fb8cf7417569bd4cc575edfad14577b08fad7afa2bf6fcbbfa13deb934f27277aef7f5f92fd7277f97b7d237ba9be4e50875f89f6ec6de3b3435c9ec1ae6f4f59290ff7bbde5a7c64b4bdee806f46d5273e761cf635dfebbf752fd9bad671fee7673d9556f46f5d45fabd6b31bd7a2faaeeefaa7f2f7e8e5eb593d12fd586a3f1908404400001500018a08af01211a148daa9cc2419144cfb7bebdb53591ed92eab8cb66b2f552a5015a0668a6bfbc529401d5e5f3cc6904ad81d1bdb59edb0dc9380ece08ad9282ba6be12650fe0f402902938d3ddbc59758e3299368c4123a974b3665c3f94d13d72ee01055635511860b074e03c4028b267160908c2496b3b73593b230d94a99832629c417001ed355359e10ab091441b4a8f8a4332d35e56ac7802eb95070e58c7e9a81c466eca32d7d20bc88191e31282d0c8d0c70a1b2e3ae28c810d95231ee1564e842de4b1c1cb565b99ee0dadd09e668b8808fa7c198c85143f6dd79e1792fe32b5b7196df157a5454a020168e071109c5420456883d9317a80a02162ba1898547c4d9fd2612b124e7f2ac80bb3836f1e3155e425ac0e48048f99821878884662268c43ba09b86588426c42cee2e069e6c593eff0e98d76b630f658d35c918f76b7d36bdee6428cee5314f6b2e2c8c15954ac2844e16e4b8237baa728349f6aaad8843865f92e5f7cf35820fcd434dcc6bbde44d0ae4b9d2532c45d2f8cb97e642d1c483317d85d50d5236030704ffffff000033120100000001"); byte[] buf = IOUtils.hexStringToByteArray("03ffffff00004b090100000005584fce270100002800000042419e1e45152c236f0000030000030000030000030000030000030000030000030000030000030000030000030000030000030000030000030000030000030000049c03ffffff000008080100000005584fd1af01211004608c1c03ffffff000049090100000005"); System.out.println("" + buf.length); - //RTMPConnection conn = RTMPConnManager.getInstance().createConnection(RTMPMinaConnection.class); RTMPConnection conn = new RTMPMinaConnection(); conn.getState().setState(RTMP.STATE_CONNECTED); conn.setHandler(this); - //RTMPClientProtocolDecoder decoder = new RTMPClientProtocolDecoder(); RTMPProtocolDecoder decoder = new RTMPProtocolDecoder(); decoder.decodeBuffer(conn, IoBuffer.wrap(buf)); } + @Test + public void testNullJsonKV() { + log.debug("\n testNullJsonKV"); + RTMPProtocolEncoder enc = new RTMPProtocolEncoder(); + RTMPProtocolDecoder dec = new RTMPProtocolDecoder(); + RTMPMinaConnection conn = new RTMPMinaConnection(); + // RTMPMinaConnection conn = new RTMPMinaConnection() { + // @Override + // public Encoding getEncoding() { + // return Encoding.AMF3; + // } + // }; + conn.getState().setState(RTMP.STATE_CONNECTED); + conn.setHandler(this); + + Red5.setConnectionLocal(conn); + + String json = "{Server=NGINX RTMP (github.com/arut/nginx-rtmp-module), width=1280.0, height=720.0, displayWidth=1280.0, displayHeight=720.0, duration=0.0, framerate=30.0, fps=30.0, videodatarate=2500.0, videocodecid=7.0, audiodatarate=160.0, audiocodecid=10.0, profile=, level=}"; + + Header header = new Header(); + header.setStreamId(1); + header.setDataType(Notify.TYPE_STREAM_METADATA); + + IoBuffer bf = IoBuffer.allocate(128); + bf.setAutoExpand(true); + Output writer = new Output(bf); + String action = "onMetaData"; + writer.writeString(action); + writer.writeObject(json); + writer.buf().flip(); + + /* using notify event using a Notify object + Notify notify = new Notify(writer.buf()); + notify.setSourceType(Constants.SOURCE_TYPE_LIVE); + notify.setTimestamp(0); + notify.setAction(action); + notify.setHeader(header); + + IoBuffer encoded = enc.encodeStreamMetadata(notify); + */ + + IoBuffer encoded = writer.buf(); + + byte[] copy = Arrays.copyOfRange(encoded.array(), encoded.arrayOffset(), encoded.remaining()); + log.debug("Encoded: {}", Hex.encodeHexString(copy)); + + Notify event = (Notify) dec.decodeMessage(conn, header, encoded); + log.debug("Decoded: {}", event.toString()); + + Red5.setConnectionLocal(null); + } + @Override public void connectionOpened(RTMPConnection conn) { log.debug("connectionOpened - conn: {}", conn);