//: # “ ”## Core Framework Samples
Parsing an incoming message
When writing network application, very often we need to parse incoming bytes into application specific message.
The problem is that chunk of data been read from NIO Connection may contain just a part of application message, or contain several of them at once. So how parsing Filter should work out these scenarios?
- If we don’t have enough data to build an application message - we stop the FilterChain processing and store the incomplete buffer by returning
return filterChainContext.getStopAction(buffer);
-
If the source Buffer contains more than one message at once - we’re splitting up the source Buffer into two chunks:
- The Buffer, which belongs to the first complete application message;
- The remainder.
The 1st chunk could be parsed immediately and correspondent application message created. After completing the parsing we may want to instruct FilterChain to continue execution, by calling the next Filter in the chain, and store the remainder to be processed later (once processing of this message is complete).
return filterChainContext.getInvokeAction(remainder);
Here is an example of parsing Filter, parses the incoming Buffer and creates a GIOPMessage, which has following structure
public class GIOPMessage {
private byte G;
private byte I;
private byte O;
private byte P;
private byte major;
private byte minor;
private byte flags;
private byte value;
private int bodyLength;
private byte[] body;
}
So the actual parser Filter code is
private static final int HEADER_SIZE = 12;
/**
* Method is called, when new data was read from the Connection and ready
* to be processed.
*
* We override this method to perform Buffer -> GIOPMessage transformation.
*
* @param ctx Context of {@link FilterChainContext} processing
* @return the next action
* @throws java.io.IOException
*/
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
// Get the source buffer from the context
final Buffer sourceBuffer = ctx.getMessage();
final int sourceBufferLength = sourceBuffer.remaining();
// If source buffer doesn't contain header
if (sourceBufferLength < HEADER_SIZE) {
// stop the filterchain processing and store sourceBuffer to be
// used next time
}
// Get the body length
final int bodyLength = sourceBuffer.getInt(HEADER_SIZE - 4);
// The complete message length
final int completeMessageLength = HEADER_SIZE + bodyLength;
// If the source message doesn't contain entire body
if (sourceBufferLength < completeMessageLength) {
// stop the filterchain processing and store sourceBuffer to be
// used next time
}
// Check if the source buffer has more than 1 complete GIOP message
// If yes - split up the first message and the remainder
final Buffer remainder = sourceBufferLength > completeMessageLength ?
sourceBuffer.split(completeMessageLength) : null;
// Construct a GIOP message
final GIOPMessage giopMessage = new GIOPMessage();
// Set GIOP header bytes
giopMessage.setGIOPHeader(sourceBuffer.get(), sourceBuffer.get(),
sourceBuffer.get(), sourceBuffer.get());
// Set major version
giopMessage.setMajor(sourceBuffer.get());
// Set minor version
giopMessage.setMinor(sourceBuffer.get());
// Set flags
giopMessage.setFlags(sourceBuffer.get());
// Set value
giopMessage.setValue(sourceBuffer.get());
// Set body length
giopMessage.setBodyLength(sourceBuffer.getInt());
// Read body
final byte[] body = new byte[bodyLength];
sourceBuffer.get(body);
// Set body
giopMessage.setBody(body);
ctx.setMessage(giopMessage);
// We can try to dispose the buffer
sourceBuffer.tryDispose();
// Instruct FilterChain to store the remainder (if any) and continue execution
}
Asynchronous FilterChain execution
During the FilterChain execution, we might want to continue IOEvent processing in the different/custom thread.
For example, we’ve chosen the SameThreadStrategy for our usecase to avoid redundant thread context switches and it fits great to our usecase, but at the same time we still have some scenarios, which either execute long lasting tasks, or use blocking I/O (for ex. database access), and to not make entire server unresponsive, we’d want to execute such a task in our custom thread pool and let Grizzly service other Connections.
This might be easily implemented with Grizzly using code like (not working sample):
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
// Get the SuspendAction in advance, cause once we execute LongLastTask in the
// custom thread - we lose control over the context
final NextAction suspendAction = ctx.getSuspendAction();
// suspend the current execution
ctx.suspend();
// schedule async work
scheduler.schedule(new Runnable() {
@Override
public void run() {
doLongLastingTask();
// Resume the FilterChain IOEvent processing
ctx.resumeNext();
}
}, 5, TimeUnit.SECONDS);
// return suspend status
return suspendAction;
}
during processing IOEvent. We have to understand that by instructing Grizzly to suspend the IOEvent execution, we’re becoming responsible for resuming the execution at one point of time (when long lasting task or blocking I/O operation completes).
public void run() {
doLongLastingTask();
// Resume the FilterChain IOEvent processing
ctx.resumeNext();
}
Other samples
The core framework samples can be reviewed in one of two ways:
- Directly from the git repository:
git clone https://github.com/eclipse-ee4j/grizzly.git cd grizzly git checkout intial-contribution cd samples/framework-samples
- Download and unjar the sample source bundle from: https://repo1.maven.org/maven2/org/glassfish/grizzly/samples/grizzly-framework-samples/2.4.4/grizzly-framework-samples-2.4.4-sources.jar