001/*
002 * Copyright 2006 - 2013 Stefan Balev <stefan.balev@graphstream-project.org>
003 * Julien Baudry <julien.baudry@graphstream-project.org> Antoine Dutot
004 * <antoine.dutot@graphstream-project.org> Yoann Pigné
005 * <yoann.pigne@graphstream-project.org> Guilhelm Savin
006 * <guilhelm.savin@graphstream-project.org>
007 * 
008 * This file is part of GraphStream <http://graphstream-project.org>.
009 * 
010 * GraphStream is a library whose purpose is to handle static or dynamic graph,
011 * create them from scratch, file or any source and display them.
012 * 
013 * This program is free software distributed under the terms of two licenses,
014 * the CeCILL-C license that fits European law, and the GNU Lesser General
015 * Public License. You can use, modify and/ or redistribute the software under
016 * the terms of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the
017 * following URL <http://www.cecill.info> or under the terms of the GNU LGPL as
018 * published by the Free Software Foundation, either version 3 of the License,
019 * or (at your option) any later version.
020 * 
021 * This program is distributed in the hope that it will be useful, but WITHOUT
022 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
023 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
024 * details.
025 * 
026 * You should have received a copy of the GNU Lesser General Public License
027 * along with this program. If not, see <http://www.gnu.org/licenses/>.
028 * 
029 * The fact that you are presently reading this means that you have had
030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
031 */
032package org.graphstream.stream.netstream;
033
034import java.io.IOException;
035import java.io.InputStream;
036import java.net.InetAddress;
037import java.net.InetSocketAddress;
038import java.net.UnknownHostException;
039import java.nio.ByteBuffer;
040import java.nio.channels.SelectionKey;
041import java.nio.channels.Selector;
042import java.nio.channels.ServerSocketChannel;
043import java.nio.channels.SocketChannel;
044import java.util.HashMap;
045import java.util.Iterator;
046import java.util.Set;
047
048import org.graphstream.stream.netstream.packing.NetStreamUnpacker;
049import org.graphstream.stream.thread.ThreadProxyPipe;
050import org.miv.mbox.net.PositionableByteArrayInputStream;
051
052/**
053 * <p>
054 * This class implements a receiver according to specifications the NetStream
055 * protocol.
056 * </p>
057 * 
058 * <p>
059 * See {@link NetStreamConstants} for a full description of the protocol, the
060 * sender and the receiver.
061 * </p>
062 * 
063 * @see NetStreamConstants
064 * @see NetStreamSender
065 * 
066 * 
067 *      Copyright (c) 2010 University of Luxembourg
068 * 
069 *      NetStreamReceiver.java
070 * @since Aug 13, 2011
071 * 
072 * @author Yoann Pigné
073 * 
074 */
075public class NetStreamReceiver extends Thread implements NetStreamDecoder {
076
077        /**
078         * the hostname this receiver is listening at.
079         */
080        private String hostname;
081
082        /**
083         * the port listened to.
084         */
085        private int port;
086
087        /**
088         * Receiver socket.
089         */
090        protected ServerSocketChannel server;
091
092        /**
093         * Multiplexor.
094         */
095        protected Selector selector;
096
097        /**
098         * Key for the selector.
099         */
100        protected SelectionKey key;
101
102        /**
103         * While true, the received is running.
104         */
105        protected boolean loop = true;
106
107        /**
108         * Show debugging messages.
109         */
110        protected boolean debug = true;
111
112        /**
113         * Last encountered error.
114         */
115        protected String lastError = null;
116
117        /**
118         * The current pipe commands are being written to.
119         */
120        protected ThreadProxyPipe currentStream;
121
122        /**
123         * Utility class that decodes messages according to the NetStream Protocol
124         */
125        protected NetStreamDecoder decoder;
126        
127
128        /**
129         * Current active incoming connections.
130         */
131        protected HashMap<SelectionKey, IncomingBuffer> incoming = new HashMap<SelectionKey, IncomingBuffer>();
132
133        class DefaultUnpacker extends NetStreamUnpacker {
134
135                @Override
136                public ByteBuffer unpackMessage(ByteBuffer buffer, int startIndex,
137                                int endIndex) {
138                        return buffer;
139                }
140
141                @Override
142                public int unpackMessageSize(ByteBuffer buffer) {
143                        return buffer.getInt();
144                }
145
146                /*
147                 * (non-Javadoc)
148                 * 
149                 * @see
150                 * org.graphstream.stream.netstream.packing.NetStreamUnpacker#sizeOfInt
151                 * ()
152                 */
153                @Override
154                public int sizeOfInt() {
155                        return 4;
156                }
157        };
158        private NetStreamUnpacker unpacker;
159
160        // Constructors
161
162        /**
163         * New NetStream Receiver, awaiting in its own thread at the given host name
164         * and port, for new graph events.
165         * 
166         * @param hostname
167         *            The host name to listen at messages.
168         * @param port
169         *            The port to listen at messages.
170         */
171        public NetStreamReceiver(String hostname, int port) throws IOException,
172                        UnknownHostException {
173                this(hostname, port, false);
174        }
175
176        /**
177         * New NetStream Receiver, awaiting in its own thread at "localhost" on the
178         * given port, for new graph events.
179         * 
180         * @param port
181         *            The port to listen at messages.
182         */
183        public NetStreamReceiver(int port) throws IOException, UnknownHostException {
184                this("localhost", port, false);
185        }
186
187        /**
188         * New NetStream Receiver, awaiting in its own thread at the given host name
189         * and port, for new graph events.
190         * 
191         * @param hostname
192         *            The host name to listen at messages.
193         * @param port
194         *            The port to listen at messages.
195         * @param debug
196         *            If true informations are output for each message received.
197         */
198        public NetStreamReceiver(String hostname, int port, boolean debug)
199                        throws IOException, UnknownHostException {
200                this.hostname = hostname;
201                this.port = port;
202                this.unpacker = new DefaultUnpacker();
203                this.decoder = new DefaultNetStreamDecoder();
204                setDebugOn(debug);
205                init();
206                start();
207        }
208
209        // Access
210
211        /**
212         * False as soon as the receiver terminates.
213         */
214        public synchronized boolean isRunning() {
215                return loop;
216        }
217
218
219        // Commands
220
221        /**
222         * Initialize the server socket.
223         */
224        protected void init() throws IOException, UnknownHostException {
225                selector = Selector.open();
226                server = ServerSocketChannel.open();
227
228                server.configureBlocking(false);
229
230                InetAddress ia = InetAddress.getByName(hostname);
231                InetSocketAddress isa = new InetSocketAddress(ia, port);
232
233                server.socket().bind(isa);
234
235                if (debug)
236                        debug("bound to socket %s:%d", server.socket().getInetAddress(),
237                                        server.socket().getLocalPort());
238
239                // Register a first server socket inside the multiplexer.
240
241                key = server.register(selector, SelectionKey.OP_ACCEPT);
242        }
243
244        /**
245         * Enable or disable debugging.
246         */
247        public void setDebugOn(boolean on) {
248                debug = on;
249                decoder.setDebugOn(on);
250        }
251
252
253        /**
254         * Stop the receiver.
255         */
256        public synchronized void quit() {
257                loop = false;
258                key.selector().wakeup();
259
260                if (debug)
261                        debug("stopped");
262        }
263
264        /**
265         * Ask the receiver about its active connections
266         */
267        public synchronized boolean hasActiveConnections() {
268                return !incoming.isEmpty();
269        }
270
271        /**
272         * Sets an optional NetStreamUnpaker whose "unpack" method will be called on
273         * each message.
274         * 
275         * It allows to do extra decoding on the all byte array message. You can
276         * also decrypt things.
277         * 
278         * @param unpaker
279         */
280        public void setUnpacker(NetStreamUnpacker unpaker) {
281                this.unpacker = unpaker;
282        }
283        public void removeUnpacker() {
284                unpacker = new DefaultUnpacker();
285        }
286
287        /**
288         * Wait for connections, accept them, demultiplexes them and dispatch
289         * messages to registered message boxes.
290         */
291        @Override
292        public void run() {
293                boolean l;
294
295                synchronized (this) {
296                        l = loop;
297                }
298
299                while (l) {
300                        poll();
301
302                        synchronized (this) {
303                                l = loop;
304                        }
305                }
306
307                try {
308                        server.close();
309                } catch (IOException e) {
310                        error("cannot close the server socket: " + e.getMessage(), e);
311                }
312
313                if (debug) {
314                        debug("receiver //" + hostname + ":" + port + " finished");
315                }
316        }
317
318        /**
319         * Wait until one or several chunks of message are acceptable. This method
320         * should be called in a loop. It can be used to block a program until some
321         * data is available.
322         */
323        public void poll() {
324                try {
325                        // Wait for incoming messages in a loop.
326
327                        if (key.selector().select() > 0) {
328                                Set<?> readyKeys = selector.selectedKeys();
329                                Iterator<?> i = readyKeys.iterator();
330
331                                while (i.hasNext()) {
332                                        SelectionKey akey = (SelectionKey) i.next();
333
334                                        i.remove();
335
336                                        if (akey.isAcceptable()) {
337                                                // If a new connection occurs, register the new socket
338                                                // in the multiplexer.
339
340                                                ServerSocketChannel ssocket = (ServerSocketChannel) akey
341                                                                .channel();
342                                                SocketChannel socket = ssocket.accept();
343
344                                                if (debug)
345                                                        debug("accepting socket %s:%d", socket.socket()
346                                                                        .getInetAddress(), socket.socket()
347                                                                        .getPort());
348
349                                                socket.configureBlocking(false);
350                                                socket.finishConnect();
351
352                                                // SelectionKey otherKey = socket.register( selector,
353                                                // SelectionKey.OP_READ );
354                                                socket.register(selector, SelectionKey.OP_READ);
355                                        } else if (akey.isReadable()) {
356                                                // If a message arrives, read it.
357
358                                                readDataChunk(akey);
359                                        } else if (akey.isWritable()) {
360                                                throw new RuntimeException("should not happen");
361                                        }
362                                }
363                        }
364                } catch (IOException e) {
365                        error(e, "I/O error in receiver //%s:%d thread: aborting: %s",
366                                        hostname, port, e.getMessage());
367
368                        loop = false;
369                } catch (Throwable e) {
370                        error(e, "Unknown error: %s", e.getMessage());
371
372                        loop = false;
373                }
374        }
375
376        /**
377         * When data is readable on a socket, send it to the appropriate buffer
378         * (creating it if needed).
379         */
380        protected void readDataChunk(SelectionKey key) throws IOException {
381                IncomingBuffer buf = incoming.get(key);
382
383                if (buf == null) {
384                        buf = new IncomingBuffer();
385                        incoming.put(key, buf);
386                        SocketChannel socket = (SocketChannel) key.channel();
387
388                        if (debug)
389                                debug("creating buffer for new connection from %s:%d", socket
390                                                .socket().getInetAddress(), socket.socket().getPort());
391                }
392
393                try {
394                        buf.readDataChunk(key);
395
396                } catch (IOException e) {
397                        incoming.remove(key);
398                        e.printStackTrace();
399                        error(e,
400                                        "receiver //%s:%d cannot read object socket channel (I/O error): %s",
401                                        hostname, port, e.getMessage());
402                        loop = false;
403                }
404
405                if (!buf.active) {
406                        incoming.remove(key);
407                        if (debug)
408                                debug("removing buffer %s from incoming for geting inactive. %d left",
409                                                key.toString(), incoming.size());
410
411                }
412
413        }
414
415        // Utilities
416
417        protected void error(String message, Object... data) {
418                error(null, message, data);
419        }
420
421        protected static final String LIGHT_YELLOW = "";
422        protected static final String RESET = "";
423
424        protected void error(Throwable e, String message, Object... data) {
425                // System.err.print( LIGHT_YELLOW );
426                System.err.print("[");
427                // System.err.print( RESET );
428                System.err.printf(message, data);
429                // System.err.print( LIGHT_YELLOW );
430                System.err.printf("]%n");
431                // System.err.println( RESET );
432
433                if (e != null)
434                        e.printStackTrace();
435        }
436
437        protected void debug(String message, Object... data) {
438                // System.err.print( LIGHT_YELLOW );
439                System.err.printf("[//%s:%d | ", hostname, port);
440                // System.err.print( RESET );
441                System.err.printf(message, data);
442                // System.err.print( LIGHT_YELLOW );
443                System.err.printf("]%n");
444                // System.err.println( RESET );
445        }
446
447        // Nested classes
448
449        /**
450         * The connection to a sender.
451         * 
452         * The receiver maintains several incoming connections and demultiplexes
453         * them.
454         */
455        protected class IncomingBuffer {
456                // Attributes
457
458                protected static final int BUFFER_INITIAL_SIZE = 8192; // 65535, 4096
459
460                /**
461                 * Buffer for reading.
462                 */
463                protected ByteBuffer buf = ByteBuffer.allocate(BUFFER_INITIAL_SIZE);
464
465                /**
466                 * Index in the buffer past the last byte that forms the current
467                 * message. End can be out of the buffer or out of the data read
468                 * actually.
469                 */
470                protected int end = -1;
471
472                /**
473                 * Index in the buffer of the first byte that forms the currents
474                 * message. Beg does not count the 4 bytes that give the size of the
475                 * message. While the header is being read, beg is the first byte of the
476                 * header.
477                 */
478                protected int beg = 0;
479
480                /**
481                 * Position inside beg and end past the last byte read. All bytes at and
482                 * after pos have unspecified contents. Pos always verifies pos&gt;=beg
483                 * and pos&lt;end. While the header is being read, pos is past the last
484                 * byte of the header that has been read.
485                 */
486                protected int pos = 0;
487
488                /**
489                 * Object input stream for reading the buffer. This input stream reads
490                 * data from the "bin" positionable byte array input stream, itself
491                 * mapped on the current message to decode.
492                 */
493                PositionableByteArrayInputStream in;
494
495                /**
496                 * Input stream filter on the buffer. This descendant of
497                 * ByteArrayInputStream is able to change its offset and length so that
498                 * we can map exactly the message to decode inside the buffer.
499                 */
500                PositionableByteArrayInputStream bin;
501
502                /**
503                 * When false the socket is closed and this buffer must be removed from
504                 * the active connections.
505                 */
506                protected boolean active = true;
507
508                // Constructors
509
510                public IncomingBuffer() {
511                }
512
513                // Commands
514
515                /**
516                 * Read the available bytes and buffers them. If one or more complete
517                 * serialised objects are available, send them to their respective
518                 * MBoxes.
519                 * 
520                 * Here is the junk...
521                 */
522                public void readDataChunk(SelectionKey key) throws IOException {
523                        int limit = 0; // Index past the last byte read during the current
524                                                        // invocation.
525                        int nbytes = 0; // Number of bytes read.
526                        SocketChannel socket = (SocketChannel) key.channel();
527
528                        int sizeOfInt = unpacker.sizeOfInt();
529                        // Buffers the data.
530
531                        nbytes = bufferize(pos, socket);
532                        limit = pos + nbytes;
533
534                        if (nbytes <= 0)
535                                return;
536
537                        if (debug) {
538                                debug("<chunk (%d bytes) from "
539                                                + socket.socket().getInetAddress() + ":"
540                                                + socket.socket().getPort() + ">", nbytes);
541                                int at = buf.position();
542                                for (int i = 0; i < nbytes; i++) {
543                                        System.err.printf("%d ", buf.get(at + i));
544                                }
545                                System.err.println();
546                                buf.position(at);
547                        }
548                        // Read the first header.
549
550                        if (end < 0) {
551                                if ((limit - beg) >= sizeOfInt) {
552                                        // If no data has been read yet in the buffer or if the
553                                        // buffer
554                                        // was emptied completely at previous call: prepare to read
555                                        // a
556                                        // new message by decoding its header.
557
558                                        buf.position(0);
559                                        int size = unpacker.unpackMessageSize(buf);
560                                        end = size + sizeOfInt;
561                                        beg = sizeOfInt;
562                                        if (debug)
563                                                debug("start to bufferize a %d byte long messsage",
564                                                                size);
565                                } else {
566                                        // The header is incomplete, wait next call to complete it.
567
568                                        pos = limit;
569                                }
570                        }
571
572                        // Read one or more messages or wait next call to buffers more.
573
574                        if (end > 0) {
575                                while (end < limit) {
576                                        // While the end of the message is in the limit of what was
577                                        // read, there are one or more complete messages. Decode
578                                        // them
579                                        // and read the header of the next message, until a message
580                                        // is
581                                        // incomplete or there are no more messages or a header is
582                                        // incomplete.
583
584                                        ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end);
585                                        if (unpackedBuffer == buf) {
586                                                in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg);
587                                        } else {
588                                                in = new PositionableByteArrayInputStream(
589                                                                unpackedBuffer.array(), 0, unpackedBuffer.capacity());
590                                        }
591                                        
592                                        decoder.decodeMessage(in);
593                                        buf.position(end);
594
595                                        if (end + sizeOfInt <= limit) {
596                                                // There is a following message.
597
598                                                beg = end + sizeOfInt;
599                                                end = end + unpacker.unpackMessageSize(buf) + sizeOfInt;
600                                        } else {
601                                                // There is the beginning of a following message
602                                                // but the header is incomplete. Compact the buffer
603                                                // and stop here.
604                                                assert (beg >= sizeOfInt);
605
606                                                beg = end;
607                                                int p = sizeOfInt - ((end + sizeOfInt) - limit);
608                                                compactBuffer();
609                                                pos = p;
610                                                beg = 0;
611                                                end = -1;
612                                                break;
613                                        }
614                                }
615
616                                if (end == limit) {
617                                        // If the end of the message coincides with the limit of
618                                        // what
619                                        // was read we have one last complete message. We decode it
620                                        // and
621                                        // clear the buffer for the next call.
622
623                                        ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end);
624                                        if (unpackedBuffer == buf) {
625                                                in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg);
626                                        } else {
627                                                in = new PositionableByteArrayInputStream(
628                                                                unpackedBuffer.array(), 0, unpackedBuffer.capacity());
629                                        }
630                                        
631                                        decoder.decodeMessage(in);
632                                        
633                                        buf.clear();
634                                        pos = 0;
635                                        beg = 0;
636                                        end = -1;
637                                } else if (end > limit) {
638                                        // If the end of the message if after what was read, prepare
639                                        // to
640                                        // read more at next call when we will have buffered more
641                                        // data. If we are at the end of the buffer compact it (else
642                                        // no
643                                        // more space will be available for buffering).
644
645                                        pos = limit;
646
647                                        if (end > buf.capacity())
648                                                compactBuffer();
649                                }
650                        }
651                }
652
653                /**
654                 * Read more data from the <code>socket</code> and put it in the buffer
655                 * at <code>at</code>. If the read returns -1 bytes (meaning the
656                 * connection ended), the socket is closed and this buffer will be made
657                 * inactive (and therefore removed from the active connections by the
658                 * Receiver that called it).
659                 * 
660                 * @return the number of bytes read.
661                 * @throws IOException
662                 *             if an I/O error occurs, in between the socket is closed
663                 *             and the connection is made inactive, then the exception
664                 *             is thrown.
665                 */
666                protected int bufferize(int at, SocketChannel socket)
667                                throws IOException {
668                        int nbytes = 0;
669                        // int limit = 0;
670
671                        try {
672                                buf.position(at);
673
674                                nbytes = socket.read(buf);
675
676                                if (nbytes < 0) {
677                                        active = false;
678                                        if (in != null)
679                                                in.close();
680                                        socket.close();
681                                        if (debug)
682                                                debug("socket from %s:%d closed", socket.socket()
683                                                                .getInetAddress(), socket.socket().getPort());
684                                        return nbytes;
685                                } else if (nbytes == 0) {
686                                        throw new RuntimeException(
687                                                        "should not happen: buffer to small, 0 bytes read: compact does not function? messages is larger than "
688                                                                        + buf.capacity() + "?");
689                                        // This means that there are no bytes remaining in the
690                                        // buffer... it is full.
691                                        // compactBuffer();
692                                        // return nbytes;
693                                }
694
695                                buf.position(at);
696
697                                return nbytes;
698                        } catch (IOException e) {
699                                if (debug)
700                                        debug("socket from %s:%d I/O error: %s", socket.socket()
701                                                        .getInetAddress(), socket.socket().getPort(),
702                                                        e.getMessage());
703                                active = false;
704                                if (in != null)
705                                        in.close();
706                                socket.close();
707                                throw e;
708                        }
709                }
710                
711
712                /**
713                 * Compact the buffer by removing all read data before <code>beg</code>.
714                 * The <code>beg</code>, <code>end</code> and <code>pos</code> markers
715                 * are updated accordingly. Compact works only if beg is larger than
716                 * four (the size of a header).
717                 * 
718                 * @return the offset.
719                 */
720                protected int compactBuffer() {
721                        if (beg > unpacker.sizeOfInt()) {
722                                int off = beg;
723
724                                buf.position(beg);
725                                buf.limit(buf.capacity());
726                                buf.compact();
727
728                                pos -= beg;
729                                end -= beg;
730                                beg = 0;
731
732                                return off;
733                        }
734
735                        return 0;
736                }
737
738                /**
739                 * Not used in the current implementation, we assumes that no message
740                 * will be larger than the size of the buffer.
741                 */
742                protected void enlargeBuffer() {
743                        ByteBuffer tmp = ByteBuffer.allocate(buf.capacity() * 2);
744
745                        buf.position(0);
746                        buf.limit(buf.capacity());
747                        tmp.put(buf);
748                        tmp.position(pos);
749
750                        buf = tmp;
751
752                        if (bin != null)
753                                bin.changeBuffer(buf.array());
754                }
755        }
756
757        /* (non-Javadoc)
758         * @see org.graphstream.stream.netstream.NetStreamDecoder#getStream(java.lang.String)
759         */
760        public ThreadProxyPipe getStream(String name) {
761                return decoder.getStream(name);
762        }
763
764        /* (non-Javadoc)
765         * @see org.graphstream.stream.netstream.NetStreamDecoder#getDefaultStream()
766         */
767        public ThreadProxyPipe getDefaultStream() {
768                return decoder.getDefaultStream();
769        }
770
771        /* (non-Javadoc)
772         * @see org.graphstream.stream.netstream.NetStreamDecoder#register(java.lang.String, org.graphstream.stream.thread.ThreadProxyPipe)
773         */
774        public void register(String name, ThreadProxyPipe stream) throws Exception {
775                decoder.register(name, stream);
776        }
777
778        /* (non-Javadoc)
779         * @see org.graphstream.stream.netstream.NetStreamDecoder#decodeMessage(java.io.InputStream)
780         */
781        public void decodeMessage(InputStream in) throws IOException {
782                decoder.decodeMessage(in);
783                
784        }
785
786
787}