diff --git a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolDecoder.java b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolDecoder.java index 5be39939912c7b3b173e1b24507f0c309c595a70..69b2e9c6ef361f33a70e577b400351a8b142fa4a 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolDecoder.java +++ b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolDecoder.java @@ -72,7 +72,7 @@ public class RTMPMinaProtocolDecoder extends ProtocolDecoderAdapter { log.trace("Buffers info before: position {}, limit {}, remaining {}", new Object[] { buf.position(), buf.limit(), buf.remaining() }); } try { - // construct any objects from the decoded bugger + // construct any objects from the decoded buffer List<?> objects = decoder.decodeBuffer(conn, buf); log.trace("Decoded: {}", objects); if (objects != null) { diff --git a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java index 6cb7336b866d8d9bea7b706cd67df5998b8dda1a..9366d6b5d712eef2871f5de481cbfac34204a3d1 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java +++ b/common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java @@ -69,6 +69,8 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { protected static final Logger log = LoggerFactory.getLogger(RTMPProtocolDecoder.class); + protected static final boolean isTrace = log.isTraceEnabled(), isDebug = log.isDebugEnabled(); + // close when header errors occur protected boolean closeOnHeaderError; @@ -90,8 +92,8 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { */ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) { final int position = buffer.position(); - //if (log.isTraceEnabled()) { - //log.trace("decodeBuffer: {}", Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit()))); + //if (isTrace) { + // log.trace("decodeBuffer: {}", Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit()))); //} // decoded results List<Object> result = null; @@ -102,9 +104,9 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { result = new LinkedList<>(); // get the local decode state RTMPDecodeState state = conn.getDecoderState(); - //if (log.isTraceEnabled()) { - //log.trace("RTMP decode state {}", state); - //} + if (isTrace) { + log.trace("RTMP decode state {}", state); + } if (!conn.getSessionId().equals(state.getSessionId())) { log.warn("Session decode overlap: {} != {}", conn.getSessionId(), state.getSessionId()); } @@ -140,8 +142,8 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { // close connection because we can't parse data from it conn.close(); } finally { - //if (log.isTraceEnabled()) { - //log.trace("decodeBuffer - post decode input buffer position: {} remaining: {}", buffer.position(), buffer.remaining()); + //if (isTrace) { + // log.trace("decodeBuffer - post decode input buffer position: {} remaining: {}", buffer.position(), buffer.remaining()); //} buffer.compact(); } @@ -171,7 +173,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { * on error */ public Object decode(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) throws ProtocolException { - //if (log.isTraceEnabled()) { + //if (isTrace) { //log.trace("Decoding for {}", conn.getSessionId()); //} try { @@ -194,7 +196,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } catch (RuntimeException e) { throw new ProtocolException("Error during decoding", e); } finally { - //if (log.isTraceEnabled()) { + //if (isTrace) { //log.trace("Decoding finished for {}", conn.getSessionId()); //} } @@ -213,7 +215,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { */ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) { final int position = in.position(); - //if (log.isTraceEnabled()) { + //if (isTrace) { //log.trace("decodePacket - state: {} buffer: {}", state, in); //log.trace("decodePacket: position {}, limit {}, {}", position, in.limit(), Hex.encodeHexString(Arrays.copyOfRange(in.array(), position, in.limit()))); //log.trace("decodePacket: position {}, limit {}", position, in.limit()); @@ -236,7 +238,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { final int channelId = header != null ? header.getChannelId() : chunkHeader.getChannelId(); // header empty vs header null will return the NS_FAILED message if (header.isEmpty()) { - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Header was null or empty - chh: {}", chunkHeader); } // send a NetStream.Failed message @@ -250,20 +252,12 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { // ensure that we dont exceed maximum packet size int size = header.getSize(); log.debug("Packet size: {}", size); - /* XXX(paul): This is a hack to prevent OOM when decoding has failed in some way - if (size > MAX_PACKET_SIZE) { - // Reject packets that are too big, to protect against OOM when decoding has failed in some way - log.warn("Packet size exceeded. size={}, max={}, connId={}", size, MAX_PACKET_SIZE, conn.getSessionId()); - // send a NetStream.Failed message - StreamService.sendNetStreamStatus(conn, StatusCodes.NS_FAILED, "Data exceeded maximum allowed by " + (size - MAX_PACKET_SIZE) + " bytes", "no-name", Status.ERROR, conn.getStreamIdForChannelId(channelId)); - throw new ProtocolException(String.format("Packet size exceeded. size: %s", header.getSize())); - } - */ // get the size of our chunks int readChunkSize = rtmp.getReadChunkSize(); // check to see if this is a new packet or continue decoding an existing one Packet packet = rtmp.getLastReadPacket(channelId); if (packet == null) { + log.trace("Creating new packet"); // create a new packet packet = new Packet(header.clone()); // store the packet based on its channel id @@ -271,13 +265,13 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } // get the packet data IoBuffer buf = packet.getData(); - //if (log.isTraceEnabled()) { - //log.trace("Source buffer position: {}, limit: {}, packet-buf.position {}, packet size: {}", new Object[] { in.position(), in.limit(), buf.position(), header.getSize() }); - //} + if (isTrace) { + log.trace("Source buffer position: {}, limit: {}, packet-buf.position {}, packet size: {}", in.position(), in.limit(), buf.position(), header.getSize()); + } // read chunk - int length = Math.min(buf.remaining(), readChunkSize); + int length = Math.max(buf.remaining(), readChunkSize); if (in.remaining() < length) { - //log.debug("Chunk too small, buffering ({},{})", in.remaining(), length); + log.debug("In buffer is too small, buffering ({},{})", in.remaining(), length); // how much more data we need to continue? state.bufferDecoding(in.position() - position + length); // we need to move back position so header will be available during another decode round @@ -286,17 +280,17 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } // get the chunk from our input byte[] chunk = Arrays.copyOfRange(in.array(), in.position(), in.position() + length); - //if (log.isTraceEnabled()) { - //log.trace("Read chunkSize: {}, length: {}, chunk: {}", readChunkSize, length, Hex.encodeHexString(chunk)); - //} + if (isTrace) { + log.trace("Read chunkSize: {}, length: {}, chunk: {}", readChunkSize, length, Hex.encodeHexString(chunk)); + } // move the position in.skip(length); // put the chunk into the packet buf.put(chunk); if (buf.hasRemaining()) { - //if (log.isTraceEnabled()) { - //log.trace("Packet is incomplete ({},{})", buf.remaining(), buf.limit()); - //} + if (isTrace) { + log.trace("Packet is incomplete ({},{})", buf.remaining(), buf.limit()); + } return null; } // flip so we can read / decode the packet data into a message @@ -310,7 +304,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { // flash will send an earlier time stamp when resetting a video stream with a new key frame. To avoid dropping it, we give it the // minimal increment since the last message. To avoid relative time stamps being mis-computed, we don't reset the header we stored. message.setTimestamp(timestamp); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Decoded message: {}", message); } packet.setMessage(message); @@ -329,7 +323,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { lastHeader.setTimerBase(timestamp); // clear the delta //lastHeader.setTimerDelta(0); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Last read header after decode: {}", lastHeader); } } finally { @@ -354,7 +348,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { * @return Decoded header */ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in, RTMP rtmp, int startPostion) { - //if (log.isTraceEnabled()) { + //if (isTrace) { //log.trace("decodeHeader - chh: {} input: {}", chh, Hex.encodeHexString(Arrays.copyOfRange(in.array(), in.position(), in.limit()))); //log.trace("decodeHeader - chh: {}", chh); //} @@ -373,7 +367,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } Header lastHeader = rtmp.getLastReadHeader(channelId); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("{} lastHeader: {}", Header.HeaderType.values()[headerSize], lastHeader); } // got a non-new header for a channel which has no last-read header @@ -390,7 +384,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { return null; } } - // if (log.isTraceEnabled()) { + // if (isTrace) { // log.trace("headerLength: {}", headerLength); // } @@ -414,7 +408,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } long ext = in.getUnsignedInt(); timeBase = (int) (ext ^ (ext >>> 32)); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Extended time read: {}", timeBase); } header.setExtended(true); @@ -487,7 +481,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { } long ext = in.getUnsignedInt(); int timeExt = (int) (ext ^ (ext >>> 32)); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Extended time read: {} {}", ext, timeExt); } timeBase = timeExt; @@ -542,7 +536,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { message = decodeAction(conn.getEncoding(), in, header); break; case TYPE_FLEX_STREAM_SEND: - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Decoding flex stream send on stream id: {}", header.getStreamId()); } // skip first byte @@ -551,7 +545,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { message = decodeStreamData(in.slice()); break; case TYPE_NOTIFY: - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Decoding notify on stream id: {}", header.getStreamId()); } if (header.getStreamId().doubleValue() != 0.0d) { @@ -616,7 +610,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { /** {@inheritDoc} */ public Unknown decodeUnknown(byte dataType, IoBuffer in) { - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("decodeUnknown: {}", dataType); } return new Unknown(dataType, in); @@ -784,7 +778,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { if (action == null) { throw new RuntimeException("Action was null"); } - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Action: {}", action); } // instance the invoke @@ -827,7 +821,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { */ public Ping decodePing(IoBuffer in) { Ping ping = null; - if (log.isTraceEnabled()) { + if (isTrace) { // gets the raw data as hex without changing the data or pointer String hexDump = in.getHexDump(); log.trace("Ping dump: {}", hexDump); @@ -881,7 +875,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { */ @SuppressWarnings("unchecked") public Notify decodeStreamData(IoBuffer in) { - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("decodeStreamData"); } // our result is a notify @@ -939,7 +933,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { params = Collections.EMPTY_MAP; } } - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Dataframe: {} params: {}", onCueOrOnMeta, params.toString()); } IoBuffer buf = IoBuffer.allocate(64); @@ -967,27 +961,27 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { log.debug("Params type: {}", object); if (object == DataTypes.CORE_MAP) { params = (Map<Object, Object>) input.readMap(); - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Map params: {}", params.toString()); } } else if (object == DataTypes.CORE_ARRAY) { params = (Map<Object, Object>) input.readArray(Object[].class); - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Array params: {}", params); } } else if (object == DataTypes.CORE_STRING) { String str = input.readString(); - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("String params: {}", str); } params = new HashMap<>(); params.put("0", str); } else if (object == DataTypes.CORE_OBJECT) { params = (Map<Object, Object>) input.readObject(); - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Object params: {}", params); } - } else if (log.isDebugEnabled()) { + } else if (isDebug) { log.debug("Stream send did not provide a parameter map"); } // need to debug this further @@ -1017,7 +1011,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { * @return FlexMessage event */ public FlexMessage decodeFlexMessage(IoBuffer in) { - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("decodeFlexMessage"); } // TODO: Unknown byte, probably encoding as with Flex SOs? @@ -1063,7 +1057,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { paramList.add(Deserializer.deserialize(input, Object.class)); } params = paramList.toArray(); - if (log.isTraceEnabled()) { + if (isTrace) { log.trace("Parameter count: {}", paramList.size()); for (int i = 0; i < params.length; i++) { log.trace(" > {}: {}", i, params[i]); @@ -1143,7 +1137,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { paramList.add(Deserializer.deserialize(input, Object.class)); } params = paramList.toArray(); - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Num params: {}", paramList.size()); for (int i = 0; i < params.length; i++) { log.debug(" > {}: {}", i, params[i]); @@ -1159,7 +1153,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { */ public static void setMaxPacketSize(int maxPacketSize) { MAX_PACKET_SIZE = maxPacketSize; - if (log.isDebugEnabled()) { + if (isDebug) { log.debug("Max packet size: {}", MAX_PACKET_SIZE); } } diff --git a/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java b/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java index b06415f1ad1cd3da1b8b1da29b0ff5cf8819f44f..0d044bfa2d921cad5427d2ad5292eea3e22f116c 100644 --- a/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java +++ b/server/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java @@ -21,7 +21,6 @@ import org.red5.server.net.rtmp.RTMPConnection; import org.red5.server.net.rtmp.RTMPMinaConnection; import org.red5.server.net.rtmp.RTMPUtils; import org.red5.server.net.rtmp.event.Invoke; -import org.red5.server.net.rtmp.event.Notify; import org.red5.server.net.rtmp.message.ChunkHeader; import org.red5.server.net.rtmp.message.Header; import org.red5.server.net.rtmp.message.Packet; @@ -256,13 +255,13 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { log.debug("\n testNullJsonKV"); RTMPProtocolEncoder enc = new RTMPProtocolEncoder(); RTMPProtocolDecoder dec = new RTMPProtocolDecoder(); - RTMPMinaConnection conn = new RTMPMinaConnection(); - // RTMPMinaConnection conn = new RTMPMinaConnection() { - // @Override - // public Encoding getEncoding() { - // return Encoding.AMF3; - // } - // }; + //RTMPMinaConnection conn = new RTMPMinaConnection(); + RTMPMinaConnection conn = new RTMPMinaConnection() { + @Override + public Encoding getEncoding() { + return Encoding.AMF3; + } + }; conn.getState().setState(RTMP.STATE_CONNECTED); conn.setHandler(this); @@ -270,10 +269,6 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { String json = "{Server=NGINX RTMP (github.com/arut/nginx-rtmp-module), width=1280.0, height=720.0, displayWidth=1280.0, displayHeight=720.0, duration=0.0, framerate=30.0, fps=30.0, videodatarate=2500.0, videocodecid=7.0, audiodatarate=160.0, audiocodecid=10.0, profile=, level=}"; - Header header = new Header(); - header.setStreamId(1); - header.setDataType(Notify.TYPE_STREAM_METADATA); - IoBuffer bf = IoBuffer.allocate(128); bf.setAutoExpand(true); Output writer = new Output(bf); @@ -283,6 +278,9 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { writer.buf().flip(); /* using notify event using a Notify object + Header header = new Header(); + header.setStreamId(1); + header.setDataType(Notify.TYPE_STREAM_METADATA); Notify notify = new Notify(writer.buf()); notify.setSourceType(Constants.SOURCE_TYPE_LIVE); notify.setTimestamp(0); @@ -292,13 +290,24 @@ public class TestRTMPProtocolDecoder implements IRTMPHandler { IoBuffer encoded = enc.encodeStreamMetadata(notify); */ - IoBuffer encoded = writer.buf(); + //IoBuffer encoded = writer.buf(); + + IoBuffer encoded = IoBuffer.wrap(IOUtils.hexStringToByteArray( + "05000000000183120100000002000a6f6e4d6574614461746103000653657276657202002e4e47494e582052544d5020286769746875622e636f6d2f617275742f6e67696e782d72746d702d6d6f64756c65290005776964746800409e0000000000000006686569676874004090e00000000000000c646973706c6179576964746800409e000000000000000d646973706c6179486569676874004090e0000000000000086475726174696f6e00000000000000000000096672616d657261746500403e000000000000000366707300403e000000000000000d766964656f64617461726174650040b1940000000000000c766964656f636f646563696400401c000000000000000d617564696f6461746172617465004064000000000000000c617564696f636f6465636964004024000000000000000770726f66696c65020020000000000000000000000000000000000000000000000000000000000000000000056c6576656c0200200000000000000000000000000000000000000000000000000000000000000000000009")); byte[] copy = Arrays.copyOfRange(encoded.array(), encoded.arrayOffset(), encoded.remaining()); log.debug("Encoded: {}", Hex.encodeHexString(copy)); - Notify event = (Notify) dec.decodeMessage(conn, header, encoded); - log.debug("Decoded: {}", event.toString()); + List<?> objects = dec.decodeBuffer(conn, encoded); + log.info("Decoded: {}", objects); + if (objects != null) { + for (Object object : objects) { + log.info("Decoded object: {}", object); + } + } + + //Notify event = dec.decodePacket(encoded); + //log.debug("Decoded: {}", event.toString()); Red5.setConnectionLocal(null); } diff --git a/server/src/test/resources/logback-test.xml b/server/src/test/resources/logback-test.xml index 5e2da305acf1348682f0b5e4a96f2767f74c81ea..71228b40a60e4c3487b5ef04d6286bab240ac56a 100644 --- a/server/src/test/resources/logback-test.xml +++ b/server/src/test/resources/logback-test.xml @@ -19,12 +19,4 @@ <appender-ref ref="CONSOLE" /> <appender-ref ref="FILE" /> </root> -<!-- - <logger name="org.red5.server" level="TRACE"> - <appender-ref ref="FILE"/> - </logger> - <logger name="org.red5.server.net.rtmp" level="TRACE" /> - <logger name="org.springframework" level="INFO" /> - <logger name="org.apache" level="INFO" /> ---> </configuration> \ No newline at end of file