001/*
002 * Copyright 2006 - 2013
003 *     Stefan Balev     <stefan.balev@graphstream-project.org>
004 *     Julien Baudry    <julien.baudry@graphstream-project.org>
005 *     Antoine Dutot    <antoine.dutot@graphstream-project.org>
006 *     Yoann Pigné      <yoann.pigne@graphstream-project.org>
007 *     Guilhelm Savin   <guilhelm.savin@graphstream-project.org>
008 * 
009 * This file is part of GraphStream <http://graphstream-project.org>.
010 * 
011 * GraphStream is a library whose purpose is to handle static or dynamic
012 * graph, create them from scratch, file or any source and display them.
013 * 
014 * This program is free software distributed under the terms of two licenses, the
015 * CeCILL-C license that fits European law, and the GNU Lesser General Public
016 * License. You can  use, modify and/ or redistribute the software under the terms
017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following
018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by
019 * the Free Software Foundation, either version 3 of the License, or (at your
020 * option) any later version.
021 * 
022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY
023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
024 * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
025 * 
026 * You should have received a copy of the GNU Lesser General Public License
027 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
028 * 
029 * The fact that you are presently reading this means that you have had
030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
031 */
032package org.graphstream.stream.netstream;
033
034import java.io.BufferedOutputStream;
035import java.io.IOException;
036import java.net.Socket;
037import java.net.UnknownHostException;
038import java.nio.ByteBuffer;
039import java.nio.charset.Charset;
040
041import org.graphstream.stream.Sink;
042import org.graphstream.stream.netstream.packing.NetStreamPacker;
043
044/**
045 * <p>
046 * This class implements a sender according to specifications the NetStream
047 * protocol.
048 * </p>
049 * 
050 * <p>
051 * See {@link NetStreamConstants} for a full description of the protocol, the
052 * sender and the receiver.
053 * </p>
054 * 
055 * @see NetStreamConstants
056 * @see NetStreamReceiver
057 * 
058 * 
059 *      Copyright (c) 2010 University of Luxembourg
060 * 
061 *      NetStreamSender.java
062 * @since Aug 10, 2011
063 * 
064 * @author Yoann Pigné
065 * 
066 */
067public class NetStreamSender implements Sink {
068        private static ByteBuffer NULL_BUFFER = ByteBuffer.allocate(0);
069        
070        protected String stream;
071        protected ByteBuffer streamBuffer;
072        byte[] streamIdArray;
073        protected String host;
074        protected int port;
075        protected Socket socket;
076        protected BufferedOutputStream out;
077
078        protected String sourceId = "";
079        protected ByteBuffer sourceIdBuff;
080
081        class DefaultPacker extends NetStreamPacker {
082                ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
083
084                @Override
085                public ByteBuffer packMessage(ByteBuffer buffer, int startIndex,
086                                int endIndex) {
087                        return buffer;
088                }
089
090                @Override
091                public ByteBuffer packMessageSize(int capacity) {
092                        sizeBuffer.rewind();
093                        sizeBuffer.putInt(capacity);
094                        return sizeBuffer;
095                }
096
097        };
098
099        protected NetStreamPacker packer = new DefaultPacker();
100
101        public NetStreamSender(String host, int port) throws UnknownHostException,
102                        IOException {
103                this("default", host, port);
104        }
105        public NetStreamSender(int port) throws UnknownHostException, IOException {
106                this("default", "localhost", port);
107        }
108
109        public NetStreamSender(String stream, String host, int port)
110                        throws UnknownHostException, IOException {
111                this.stream = stream;
112                this.host = host;
113                this.port = port;
114                setStream(stream);
115                
116                connect();
117                
118        }
119        
120        /**
121         * @param stream2
122         */
123        public void setStream(String stream) {
124                streamIdArray = stream.getBytes(Charset.forName("UTF-8"));
125                streamBuffer = encodeString(stream);
126                
127                
128        }
129        public NetStreamSender(Socket socket) throws IOException {
130                this("default", socket);
131        }
132        
133        public NetStreamSender(String stream, Socket socket) throws IOException {
134                this.host = socket.getInetAddress().getHostName();
135                this.port = socket.getPort();
136                this.socket = socket;
137                this.out = new BufferedOutputStream(socket.getOutputStream());
138                this.streamIdArray = stream.getBytes(Charset.forName("UTF-8"));
139        }
140
141        /**
142         * Sets an optional NetStreamPaker whose "pack" method will be called on
143         * each message.
144         * 
145         * a Packer can do extra encoding on the all byte array message, it may also
146         * crypt things.
147         * 
148         * @param paker
149         *            The packer object
150         */
151        public void setPacker(NetStreamPacker paker) {
152                this.packer = paker;
153        }
154        public void removePacker() {
155                packer = new DefaultPacker();
156        }
157
158        protected void connect() throws UnknownHostException, IOException {
159
160                socket = new Socket(host, port);
161                out = new BufferedOutputStream(socket.getOutputStream());
162
163        }
164
165        protected int getType(Object value) {
166                int valueType = NetStreamConstants.TYPE_UNKNOWN;
167                
168                if (value == null)
169                        return NetStreamConstants.TYPE_NULL;
170                
171                Class<?> valueClass = value.getClass();
172                boolean isArray = valueClass.isArray();
173                if (isArray) {
174                        valueClass = ((Object[]) value)[0].getClass();
175                }
176                if (valueClass.equals(Boolean.class)) {
177                        if (isArray) {
178                                valueType = NetStreamConstants.TYPE_BOOLEAN_ARRAY;
179                        } else {
180                                valueType = NetStreamConstants.TYPE_BOOLEAN;
181                        }
182                } else if (valueClass.equals(Byte.class)) {
183                        if (isArray) {
184                                valueType = NetStreamConstants.TYPE_BYTE_ARRAY;
185                        } else {
186                                valueType = NetStreamConstants.TYPE_BYTE;
187                        }
188                } else if (valueClass.equals(Short.class)) {
189                        if (isArray) {
190                                valueType = NetStreamConstants.TYPE_SHORT_ARRAY;
191                        } else {
192                                valueType = NetStreamConstants.TYPE_SHORT;
193                        }
194                } else if (valueClass.equals(Integer.class)) {
195                        if (isArray) {
196                                valueType = NetStreamConstants.TYPE_INT_ARRAY;
197                        } else {
198                                valueType = NetStreamConstants.TYPE_INT;
199                        }
200                } else if (valueClass.equals(Long.class)) {
201                        if (isArray) {
202                                valueType = NetStreamConstants.TYPE_LONG_ARRAY;
203                        } else {
204                                valueType = NetStreamConstants.TYPE_LONG;
205                        }
206                } else if (valueClass.equals(Float.class)) {
207                        if (isArray) {
208                                valueType = NetStreamConstants.TYPE_FLOAT_ARRAY;
209                        } else {
210                                valueType = NetStreamConstants.TYPE_FLOAT;
211                        }
212                } else if (valueClass.equals(Double.class)) {
213                        if (isArray) {
214                                valueType = NetStreamConstants.TYPE_DOUBLE_ARRAY;
215                        } else {
216                                valueType = NetStreamConstants.TYPE_DOUBLE;
217                        }
218                } else if (valueClass.equals(String.class)) {
219                        if (isArray) {
220                                valueType = NetStreamConstants.TYPE_ARRAY;
221                        } else {
222                                valueType = NetStreamConstants.TYPE_STRING;
223                        }
224                } else 
225                        System.err.printf("[warning] can not find type of %s\n", valueClass);
226                // System.out.println("ValueType="+valueType+" "+value.getClass());
227                return valueType;
228        }
229        
230        protected ByteBuffer encodeValue(Object in, int valueType) {
231
232                if (NetStreamConstants.TYPE_BOOLEAN == valueType) {
233                        return encodeBoolean(in);
234                } else if (NetStreamConstants.TYPE_BOOLEAN_ARRAY == valueType) {
235                        return encodeBooleanArray(in);
236                } else if (NetStreamConstants.TYPE_BYTE == valueType) {
237                        return encodeByte(in);
238                } else if (NetStreamConstants.TYPE_BYTE_ARRAY == valueType) {
239                        return encodeByteArray(in);
240                } else if (NetStreamConstants.TYPE_SHORT == valueType) {
241                        return encodeShort(in);
242                } else if (NetStreamConstants.TYPE_SHORT_ARRAY == valueType) {
243                        return encodeShortArray(in);
244                } else if (NetStreamConstants.TYPE_INT == valueType) {
245                        return encodeInt(in);
246                } else if (NetStreamConstants.TYPE_INT_ARRAY == valueType) {
247                        return encodeIntArray(in);
248                } else if (NetStreamConstants.TYPE_LONG == valueType) {
249                        return encodeLong(in);
250                } else if (NetStreamConstants.TYPE_LONG_ARRAY == valueType) {
251                        return encodeLongArray(in);
252                } else if (NetStreamConstants.TYPE_FLOAT == valueType) {
253                        return encodeFloat(in);
254                } else if (NetStreamConstants.TYPE_FLOAT_ARRAY == valueType) {
255                        return encodeFloatArray(in);
256                } else if (NetStreamConstants.TYPE_DOUBLE == valueType) {
257                        return encodeDouble(in);
258                } else if (NetStreamConstants.TYPE_DOUBLE_ARRAY == valueType) {
259                        return encodeDoubleArray(in);
260                } else if (NetStreamConstants.TYPE_STRING == valueType) {
261                        return encodeString(in);
262                } else if (NetStreamConstants.TYPE_ARRAY == valueType) {
263                        return encodeArray(in);
264                } else if (NetStreamConstants.TYPE_NULL == valueType) {
265                        return NULL_BUFFER;
266                }
267                
268                System.err.printf("[warning] unknown value type %d\n", valueType);
269                
270                return null;
271        }
272
273        /**
274         * @param in
275         * @return
276         */
277        protected ByteBuffer encodeArray(Object in) {
278                // TODO...
279                return null;
280        }
281
282        /*
283        private void outBuffer(ByteBuffer buf){
284                System.out.println(buf.toString());
285                int nbytes = buf.capacity();
286                int at = buf.position();
287                for(int i=0; i< nbytes; i++){
288                        int bt = buf.get(at+i);
289                        if (bt < 0) bt = (bt & 127) + (bt & 128); 
290                        System.out.printf("%d ", bt);
291                }
292                System.out.println();
293        }*/
294        
295        /**
296         * @param in
297         * @return
298         */
299        protected ByteBuffer encodeString(Object in) {
300                //System.out.println("They want me to encode this string: "+in);
301                String s = (String) in;
302                byte[] data = s.getBytes(Charset.forName("UTF-8"));
303                
304                ByteBuffer lenBuff = encodeUnsignedVarint(data.length);
305                //outBuffer(lenBuff);
306                ByteBuffer bb = ByteBuffer.allocate(lenBuff.capacity() + data.length);
307                bb.put(lenBuff).put(data);
308                bb.rewind();
309                //outBuffer(bb);
310                
311                return bb;
312        }
313
314        /**
315         * @param in
316         * @return
317         */
318        protected ByteBuffer encodeDoubleArray(Object in) {
319                Object[] data = (Object[]) in;
320
321                int ssize = varintSize(data.length);
322                
323                ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 8);
324
325                putVarint(b, data.length, ssize);
326
327                for (int i = 0; i < data.length; i++) {
328                        b.putDouble((Double) data[i]);
329                }
330                b.rewind();
331                return b;
332        }
333
334        /**
335         * @param in The double to encode
336         * @return ByteBuffer with encoded double in it
337         */
338        protected ByteBuffer encodeDouble(Object in) {
339                ByteBuffer bb = ByteBuffer.allocate(8).putDouble((Double) in);
340                bb.rewind();
341                return bb;
342        }
343
344        /**
345         * @param in The float array to encode
346         * @return ByteBuffer with encoded float array in it
347         */
348        protected ByteBuffer encodeFloatArray(Object in) {
349                Object[] data = (Object[]) in;
350                
351                int ssize = varintSize(data.length);
352                
353                ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 4);
354                
355                putVarint(b, data.length, ssize);
356
357                for (int i = 0; i < data.length; i++) {
358                        b.putFloat((Float) data[i]);
359                }
360                b.rewind();
361                return b;
362        }
363
364        /**
365         * @param in The float to encode
366         * @return ByteBuffer with encoded float in it
367         */
368        protected ByteBuffer encodeFloat(Object in) {
369                ByteBuffer b = ByteBuffer.allocate(4);
370                b.putFloat(((Float) in));
371                b.rewind();
372                return b;
373        }
374
375        /**
376         * @param in The long array to encode
377         * @return ByteBuffer with encoded long array in it
378         */
379        protected ByteBuffer encodeLongArray(Object in) {
380                return encodeVarintArray(in);
381        }
382
383        /**
384         * @param in The long to encode
385         * @return ByteBuffer with encoded long in it
386         */
387        protected ByteBuffer encodeLong(Object in) {
388                return encodeVarint(in);
389        }
390
391        /**
392         * @param in The integer array to encode
393         * @return ByteBuffer with encoded integer array in it
394         */
395        protected ByteBuffer encodeIntArray(Object in) {
396                return encodeVarintArray(in);
397        }
398
399        /**
400         * @param in The integer to encode
401         * @return ByteBuffer with encoded integer in it
402         */
403        protected ByteBuffer encodeInt(Object in) {
404                return encodeVarint(in);
405        }
406
407        /**
408         * @param in
409         * @return
410         */
411        protected ByteBuffer encodeShortArray(Object in) {
412                return encodeVarintArray(in);
413        }
414
415        /**
416         * @param in
417         * @return
418         */
419        protected ByteBuffer encodeShort(Object in) {
420                return encodeVarint(in);
421        }
422
423        /**
424         * @param in
425         * @return
426         */
427        protected ByteBuffer encodeByteArray(Object in) {
428                Object[] data = (Object[]) in;
429
430                int ssize = varintSize(data.length);
431                
432                ByteBuffer b = ByteBuffer.allocate(ssize + data.length);
433                
434                putVarint(b, data.length, ssize);
435
436                for (int i = 0; i < data.length; i++) {
437                        b.put((Byte) data[i]);
438                }
439                b.rewind();
440                return b;
441        }
442
443        /**
444         * @param in
445         * @return
446         */
447        protected ByteBuffer encodeByte(Object in) {
448                ByteBuffer b = ByteBuffer.allocate(1);
449                b.put(((Byte) in));
450                b.rewind();
451                return b;
452        }
453
454        /**
455         * @param in
456         * @return
457         */
458        protected ByteBuffer encodeBooleanArray(Object in) {
459                Object[] data = (Object[]) in;
460
461                int ssize = varintSize(data.length);
462                
463                ByteBuffer b = ByteBuffer.allocate(ssize + data.length);
464                
465                putVarint(b, data.length, ssize);
466
467                for (int i = 0; i < data.length; i++) {
468                        b.put((byte) ((Boolean) data[i] == false ? 0 : 1));
469                }
470                b.rewind();
471                return b;
472        }
473
474        /**
475         * @param in
476         * @return
477         */
478        protected ByteBuffer encodeBoolean(Object in) {
479                ByteBuffer b = ByteBuffer.allocate(1);
480                b.put((byte) (((Boolean) in) == false ? 0 : 1));
481                b.rewind();
482                return b;
483        }
484
485        private int varintSize(long data){
486                
487                // 7 bits -> 127
488                if(data < (1L << 7)){
489                        return 1;
490                }
491                
492                // 14 bits -> 16383
493                if(data < (1L << 14)){
494                        return 2;
495                }
496                
497                // 21 bits -> 2097151
498                if(data < (1L << 21)){
499                        return 3;
500                }
501                
502                // 28 bits -> 268435455
503                if(data < (1L << 28)){
504                        return 4;
505                }
506
507                // 35 bits -> 34359738367
508                if(data < (1L << 35)){
509                        return 5;
510                }
511
512                // 42 bits -> 4398046511103
513                if(data < (1L << 42)){
514                        return 6;
515                }
516                
517                // 49 bits -> 562949953421311
518                if(data < (1L << 49)){
519                        return 7;
520                }
521                
522                // 56 bits -> 72057594037927935
523                if(data < (1L << 56)){
524                        return 8;
525                }       
526                
527                return 9;
528        }
529        /**
530         * @param in
531         * @return
532         */
533        protected ByteBuffer encodeVarint(Object in) {
534                long data = ((Number)in).longValue();
535                
536                // signed integers encoding
537                // (n << 1) ^ (n >> 31)
538                // OK but java's negative values are two's complements...
539                
540                return encodeUnsignedVarint(data>=0?(data<<1):((Math.abs(data) << 1) ^ 1));
541        }
542
543        /**
544         * @param in
545         * @return
546         */
547        protected ByteBuffer encodeVarintArray(Object in) {
548                Object[] data = (Object[]) in;
549                int[] sizes = new int[data.length];
550                long[] zigzags = new long[data.length];
551                int sumsizes=0;
552                for (int i = 0; i < data.length; i++) {
553                        long datum = ((Number)data[i]).longValue();
554                        // signed integers encoding
555                        // (n << 1) ^ (n >> 31)
556                        // OK but java's negative values are two's complements...
557                        zigzags[i] = datum>0?(datum<<1):((Math.abs(datum) << 1) ^ 1);
558                        
559                        sizes[i] = varintSize(zigzags[i]);
560                        sumsizes+=sizes[i];
561                        //System.out.printf("i=%d, zigzag=%d, size=%d\n",i, zigzags[i], sizes[i]);
562                }               
563                
564                // the size of the size!
565                int ssize = varintSize(data.length);
566                
567                ByteBuffer b = ByteBuffer.allocate(ssize + sumsizes);
568                
569                putVarint(b, data.length, ssize);
570                
571                for (int i = 0; i < data.length; i++) {
572                        putVarint(b, zigzags[i], sizes[i]);
573                }
574                b.rewind();
575                //outBuffer(b);
576                return b;
577        }
578        
579        /**
580         * @param in
581         * @return
582         */
583        protected ByteBuffer encodeUnsignedVarint(Object in) {
584                long data = ((Number)in).longValue();
585                
586                int size = varintSize(data);
587                
588                ByteBuffer buff = ByteBuffer.allocate(size);
589                for(int i = 0; i < size; i++){
590                        int head=128;
591                        if(i==size-1) head = 0;
592                        long b = ((data >> (7*i)) & 127) ^ head;
593                        buff.put((byte)(b & 255 ));
594                }
595                buff.rewind();
596                return  buff;
597        }
598
599        
600        /**
601         * @param b
602         * @param sumsizes
603         * @param ssize
604         */
605        private void putVarint(ByteBuffer buffer, long number, int byteSize) {
606                for(int i = 0; i < byteSize; i++){
607                        int head=128;
608                        if(i==byteSize-1) head = 0;
609                        long b = ((number >> (7*i)) & 127) ^ head;
610                        buffer.put((byte)(b & 255 ));
611                }
612        }
613        
614        /**
615         * @param buff
616         */
617        private void doSend(ByteBuffer buff) {
618
619                if (socket.isClosed()) {
620                        System.err
621                                        .println("NetStreamSender : can't send. The socket is closed.");
622                } else {
623                        buff.rewind();
624                        //outBuffer(buff);
625                        ByteBuffer buffer = packer.packMessage(buff);
626                        ByteBuffer sizeBuffer = packer.packMessageSize(buffer.capacity());
627        
628                        // real sending
629                        try {
630                                out.write(sizeBuffer.array(), 0, sizeBuffer.capacity());
631                                out.write(buffer.array(), 0, buffer.capacity());
632                                out.flush();
633                        } catch (IOException e) {
634                                try {
635                                        socket.close();
636                                } catch (IOException e1) {
637                                        e1.printStackTrace();
638                                }
639                                
640                                System.err.printf("socket error : %s\n", e.getMessage());
641                        }
642                }
643        }
644
645        /*
646         * (non-Javadoc)
647         * 
648         * @see
649         * org.graphstream.stream.AttributeSink#graphAttributeAdded(java.lang.String
650         * , long, java.lang.String, java.lang.Object)
651         */
652        public void graphAttributeAdded(String sourceId, long timeId,
653                        String attribute, Object value) {
654
655                if (!sourceId.equals(this.sourceId)) {
656                        this.sourceId = sourceId;
657                        sourceIdBuff = encodeString(sourceId);
658                        
659                }
660                ByteBuffer attrBuff = encodeString(attribute);
661                int valueType = getType(value);
662                ByteBuffer valueBuff = encodeValue(value, valueType);
663                ByteBuffer buff = ByteBuffer.allocate(
664                                streamBuffer.capacity() + // stream                                                                                                                                                     
665                                1 + // CMD
666                                sourceIdBuff.capacity() + // source id
667                                varintSize(timeId) + // timeId
668                                attrBuff.capacity() + // attribute id
669                                1 + // attr type
670                                valueBuff.capacity()); // attr value
671                
672                streamBuffer.rewind();
673                sourceIdBuff.rewind();
674                buff
675                        .put(streamBuffer)
676                        .put((byte) NetStreamConstants.EVENT_ADD_GRAPH_ATTR)
677                        .put(sourceIdBuff)
678                        .put(encodeUnsignedVarint(timeId))
679                        .put(attrBuff)
680                        .put((byte) valueType)
681                        .put(valueBuff);
682                
683                doSend(buff);
684
685        }
686
687        /*
688         * (non-Javadoc)
689         * 
690         * @see
691         * org.graphstream.stream.AttributeSink#graphAttributeChanged(java.lang.
692         * String, long, java.lang.String, java.lang.Object, java.lang.Object)
693         */
694        public void graphAttributeChanged(String sourceId, long timeId,
695                        String attribute, Object oldValue, Object newValue) {
696
697                if (!sourceId.equals(this.sourceId)) {
698                        this.sourceId = sourceId;
699                        sourceIdBuff = encodeString(sourceId);
700                }
701                ByteBuffer attrBuff = encodeString(attribute);
702                int oldValueType = getType(oldValue);
703                int newValueType = getType(newValue);
704                
705                ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
706                ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
707                
708                
709                ByteBuffer buff = ByteBuffer.allocate(
710                                streamBuffer.capacity() + // stream                                                                                                                                                     
711                                1 + // CMD
712                                sourceIdBuff.capacity() + // source id
713                                varintSize(timeId) + // timeId
714                                attrBuff.capacity() + // attribute id
715                                1 + // attr type
716                                oldValueBuff.capacity() + // attr value
717                                1 + // attr type
718                                newValueBuff.capacity()); // attr value
719                
720                streamBuffer.rewind();
721                sourceIdBuff.rewind();
722                
723                buff
724                        .put(streamBuffer)
725                        .put((byte) NetStreamConstants.EVENT_CHG_GRAPH_ATTR)
726                        .put(sourceIdBuff)
727                        .put(encodeUnsignedVarint(timeId))
728                        .put(attrBuff)
729                        .put((byte) oldValueType)
730                        .put(oldValueBuff)
731                        .put((byte) newValueType)
732                        .put(newValueBuff);
733
734                doSend(buff);
735
736        }
737
738        /*
739         * (non-Javadoc)
740         * 
741         * @see
742         * org.graphstream.stream.AttributeSink#graphAttributeRemoved(java.lang.
743         * String, long, java.lang.String)
744         */
745        public void graphAttributeRemoved(String sourceId, long timeId,
746                        String attribute) {
747
748                if (!sourceId.equals(this.sourceId)) {
749                        this.sourceId = sourceId;
750                        sourceIdBuff = encodeString(sourceId);
751                }
752                ByteBuffer attrBuff = encodeString(attribute);
753
754                ByteBuffer buff = ByteBuffer.allocate(
755                                streamBuffer.capacity() + // stream                                                                                                                                                     
756                                1 + // CMD
757                                sourceIdBuff.capacity() + // source id
758                                varintSize(timeId) + // timeId
759                                attrBuff.capacity()
760                                ); // attribute id
761                
762                streamBuffer.rewind();
763                sourceIdBuff.rewind();
764                                                                
765                buff
766                .put(streamBuffer)
767                .put((byte) NetStreamConstants.EVENT_DEL_GRAPH_ATTR)
768                .put(sourceIdBuff)
769                .put(encodeUnsignedVarint(timeId))
770                .put(attrBuff);
771
772                doSend(buff);
773
774        }
775
776        /*
777         * (non-Javadoc)
778         * 
779         * @see
780         * org.graphstream.stream.AttributeSink#nodeAttributeAdded(java.lang.String,
781         * long, java.lang.String, java.lang.String, java.lang.Object)
782         */
783        public void nodeAttributeAdded(String sourceId, long timeId, String nodeId,
784                        String attribute, Object value) {
785
786                if (!sourceId.equals(this.sourceId)) {
787                        this.sourceId = sourceId;
788                        sourceIdBuff = encodeString(sourceId);
789                }
790                ByteBuffer nodeBuff = encodeString(nodeId);
791                ByteBuffer attrBuff = encodeString(attribute);
792                int valueType = getType(value);
793                ByteBuffer valueBuff = encodeValue(value, valueType);
794                
795                ByteBuffer buff = ByteBuffer.allocate(
796                                streamBuffer.capacity() + // stream                                                                                                                                                     
797                                1 + // CMD
798                                sourceIdBuff.capacity() + // source id
799                                varintSize(timeId) + // timeId
800                                nodeBuff.capacity() + // nodeId 
801                                attrBuff.capacity() + // attribute
802                                1 + // value type
803                                valueBuff.capacity() // value
804                );
805                
806                streamBuffer.rewind();
807                sourceIdBuff.rewind();
808                
809                
810                buff
811                .put(streamBuffer)
812                .put((byte) NetStreamConstants.EVENT_ADD_NODE_ATTR)
813                .put(sourceIdBuff)
814                .put(encodeUnsignedVarint(timeId))
815                .put(nodeBuff)
816                .put(attrBuff)
817                .put((byte) valueType)
818                .put(valueBuff);
819
820                
821                doSend(buff);
822
823        }
824
825        /*
826         * (non-Javadoc)
827         * 
828         * @see
829         * org.graphstream.stream.AttributeSink#nodeAttributeChanged(java.lang.String
830         * , long, java.lang.String, java.lang.String, java.lang.Object,
831         * java.lang.Object)
832         */
833        public void nodeAttributeChanged(String sourceId, long timeId,
834                        String nodeId, String attribute, Object oldValue, Object newValue) {
835                if (!sourceId.equals(this.sourceId)) {
836                        this.sourceId = sourceId;
837                        sourceIdBuff = encodeString(sourceId);
838                }
839                
840                ByteBuffer nodeBuff = encodeString(nodeId);
841                ByteBuffer attrBuff = encodeString(attribute);
842                
843                int oldValueType = getType(oldValue);
844                int newValueType = getType(newValue);
845                
846                ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
847                ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
848                
849                ByteBuffer buff = ByteBuffer.allocate(
850                                streamBuffer.capacity() + // stream                                                                                                                                                     
851                                1 + // CMD
852                                sourceIdBuff.capacity() + // source id
853                                varintSize(timeId) + // timeId
854                                nodeBuff.capacity() + // nodeId 
855                                attrBuff.capacity() + // attribute
856                                1 + // value type
857                                oldValueBuff.capacity() + // value
858                                1 + // value type
859                                newValueBuff.capacity() // value
860                );
861                
862                streamBuffer.rewind();
863                sourceIdBuff.rewind();
864                
865                
866                buff
867                .put(streamBuffer)
868                .put((byte) NetStreamConstants.EVENT_CHG_NODE_ATTR)
869                .put(sourceIdBuff)
870                .put(encodeUnsignedVarint(timeId))
871                .put(nodeBuff)
872                .put(attrBuff)
873                .put((byte) oldValueType)
874                .put(oldValueBuff)
875                .put((byte) newValueType)
876                .put(newValueBuff);
877
878                doSend(buff);
879        }
880
881        /*
882         * (non-Javadoc)
883         * 
884         * @see
885         * org.graphstream.stream.AttributeSink#nodeAttributeRemoved(java.lang.String
886         * , long, java.lang.String, java.lang.String)
887         */
888        public void nodeAttributeRemoved(String sourceId, long timeId,
889                        String nodeId, String attribute) {
890
891                if (!sourceId.equals(this.sourceId)) {
892                        this.sourceId = sourceId;
893                        sourceIdBuff = encodeString(sourceId);
894                }
895                ByteBuffer nodeBuff = encodeString(nodeId);
896                ByteBuffer attrBuff = encodeString(attribute);
897
898                ByteBuffer buff = ByteBuffer.allocate(
899                                streamBuffer.capacity() + // stream                                                                                                                                                     
900                                1 + // CMD
901                                sourceIdBuff.capacity() + // source id
902                                varintSize(timeId) + // timeId
903                                nodeBuff.capacity() + // nodeId 
904                                attrBuff.capacity() // attribute
905                );
906                
907                
908                streamBuffer.rewind();
909                sourceIdBuff.rewind();
910                
911                
912                buff
913                .put(streamBuffer)
914                .put((byte) NetStreamConstants.EVENT_DEL_NODE_ATTR)
915                .put(sourceIdBuff)
916                .put(encodeUnsignedVarint(timeId))
917                .put(nodeBuff)
918                .put(attrBuff);
919                
920                doSend(buff);
921
922        }
923
924        /*
925         * (non-Javadoc)
926         * 
927         * @see
928         * org.graphstream.stream.AttributeSink#edgeAttributeAdded(java.lang.String,
929         * long, java.lang.String, java.lang.String, java.lang.Object)
930         */
931        public void edgeAttributeAdded(String sourceId, long timeId, String edgeId,
932                        String attribute, Object value) {
933
934                if (!sourceId.equals(this.sourceId)) {
935                        this.sourceId = sourceId;
936                        sourceIdBuff = encodeString(sourceId);
937                }
938                ByteBuffer edgeBuff = encodeString(edgeId);
939                ByteBuffer attrBuff = encodeString(attribute);
940
941                int valueType = getType(value);
942                
943                ByteBuffer valueBuff = encodeValue(value, valueType);
944                
945                ByteBuffer buff = ByteBuffer.allocate(
946                                streamBuffer.capacity() + // stream                                                                                                                                                     
947                                1 + // CMD
948                                sourceIdBuff.capacity() + // source id
949                                varintSize(timeId) + // timeId
950                                edgeBuff.capacity() + // nodeId 
951                                attrBuff.capacity() + // attribute
952                                1 + // value type
953                                valueBuff.capacity() // value
954                );
955                
956                streamBuffer.rewind();
957                sourceIdBuff.rewind();
958                
959                
960                buff
961                .put(streamBuffer)
962                .put((byte) NetStreamConstants.EVENT_ADD_EDGE_ATTR)
963                .put(sourceIdBuff)
964                .put(encodeUnsignedVarint(timeId))
965                .put(edgeBuff)
966                .put(attrBuff)
967                .put((byte) valueType) // value type
968                .put(valueBuff);
969
970                doSend(buff);
971        }
972
973        /*
974         * (non-Javadoc)
975         * 
976         * @see
977         * org.graphstream.stream.AttributeSink#edgeAttributeChanged(java.lang.String
978         * , long, java.lang.String, java.lang.String, java.lang.Object,
979         * java.lang.Object)
980         */
981        public void edgeAttributeChanged(String sourceId, long timeId,
982                        String edgeId, String attribute, Object oldValue, Object newValue) {
983
984                if (!sourceId.equals(this.sourceId)) {
985                        this.sourceId = sourceId;
986                        sourceIdBuff = encodeString(sourceId);
987                }
988                ByteBuffer edgeBuff = encodeString(edgeId);
989                ByteBuffer attrBuff = encodeString(attribute);
990                int oldValueType = getType(oldValue);
991                int newValueType = getType(newValue);
992
993                ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
994                ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
995                
996                ByteBuffer buff = ByteBuffer.allocate(
997                                streamBuffer.capacity() + // stream                                                                                                                                                     
998                                1 + // CMD
999                                sourceIdBuff.capacity() + // source id
1000                                varintSize(timeId) + // timeId
1001                                edgeBuff.capacity() + // nodeId 
1002                                attrBuff.capacity() + // attribute
1003                                1 + // value type
1004                                oldValueBuff.capacity() + // value
1005                                1 + // value type
1006                                newValueBuff.capacity()  // value
1007                );
1008
1009                
1010                streamBuffer.rewind();
1011                sourceIdBuff.rewind();
1012                
1013                
1014                buff
1015                .put(streamBuffer)
1016                .put((byte) NetStreamConstants.EVENT_CHG_EDGE_ATTR)
1017                .put(sourceIdBuff)
1018                .put(encodeUnsignedVarint(timeId))
1019                .put(edgeBuff)
1020                .put(attrBuff)
1021                .put((byte) oldValueType)
1022                .put(oldValueBuff)
1023                .put((byte) newValueType)
1024                .put(newValueBuff);
1025
1026                doSend(buff);
1027
1028        }
1029
1030        /*
1031         * (non-Javadoc)
1032         * 
1033         * @see
1034         * org.graphstream.stream.AttributeSink#edgeAttributeRemoved(java.lang.String
1035         * , long, java.lang.String, java.lang.String)
1036         */
1037        public void edgeAttributeRemoved(String sourceId, long timeId,
1038                        String edgeId, String attribute) {
1039
1040                if (!sourceId.equals(this.sourceId)) {
1041                        this.sourceId = sourceId;
1042                        sourceIdBuff = encodeString(sourceId);
1043                        }
1044                ByteBuffer edgeBuff = encodeString(edgeId);
1045                ByteBuffer attrBuff = encodeString(attribute);
1046                
1047                ByteBuffer buff = ByteBuffer.allocate(
1048                                streamBuffer.capacity() + // stream                                                                                                                                                     
1049                                1 + // CMD
1050                                sourceIdBuff.capacity() + // source id
1051                                varintSize(timeId) + // timeId
1052                                edgeBuff.capacity() + // nodeId 
1053                                attrBuff.capacity() // attribute
1054                );
1055                
1056                
1057                streamBuffer.rewind();
1058                sourceIdBuff.rewind();
1059                
1060
1061                buff
1062                .put(streamBuffer)
1063                .put((byte) NetStreamConstants.EVENT_DEL_EDGE_ATTR)
1064                .put(sourceIdBuff)
1065                .put(encodeUnsignedVarint(timeId))
1066                .put(edgeBuff)
1067                .put(attrBuff);
1068                
1069
1070                doSend(buff);
1071
1072        }
1073
1074        /*
1075         * (non-Javadoc)
1076         * 
1077         * @see org.graphstream.stream.ElementSink#nodeAdded(java.lang.String, long,
1078         * java.lang.String)
1079         */
1080        public void nodeAdded(String sourceId, long timeId, String nodeId) {
1081
1082                if (!sourceId.equals(this.sourceId)) {
1083                        this.sourceId = sourceId;
1084                        sourceIdBuff = encodeString(sourceId);
1085                }
1086                ByteBuffer nodeBuff = encodeString(nodeId);
1087                
1088                
1089                ByteBuffer buff = ByteBuffer.allocate(
1090                                streamBuffer.capacity() + // stream                                                                                                                                                     
1091                                1 + // CMD
1092                                sourceIdBuff.capacity() + // source id
1093                                varintSize(timeId) + // timeId
1094                                nodeBuff.capacity() // nodeId 
1095                );
1096                
1097                streamBuffer.rewind();
1098                sourceIdBuff.rewind();
1099                
1100                
1101                buff
1102                .put(streamBuffer)
1103                .put((byte) NetStreamConstants.EVENT_ADD_NODE)
1104                .put(sourceIdBuff)
1105                .put(encodeUnsignedVarint(timeId))
1106                .put(nodeBuff);
1107                
1108                
1109                doSend(buff);
1110
1111        }
1112
1113        /*
1114         * (non-Javadoc)
1115         * 
1116         * @see org.graphstream.stream.ElementSink#nodeRemoved(java.lang.String,
1117         * long, java.lang.String)
1118         */
1119        public void nodeRemoved(String sourceId, long timeId, String nodeId) {
1120                if (!sourceId.equals(this.sourceId)) {
1121                        this.sourceId = sourceId;
1122                        sourceIdBuff = encodeString(sourceId);
1123                }
1124                ByteBuffer nodeBuff = encodeString(nodeId);
1125                
1126                ByteBuffer buff = ByteBuffer.allocate(
1127                                streamBuffer.capacity() + // stream                                                                                                                                                     
1128                                1 + // CMD
1129                                sourceIdBuff.capacity() + // source id
1130                                varintSize(timeId) + // timeId
1131                                nodeBuff.capacity() // nodeId 
1132                );
1133                
1134                streamBuffer.rewind();
1135                sourceIdBuff.rewind();
1136                
1137                
1138                buff
1139                .put(streamBuffer)
1140                .put((byte) NetStreamConstants.EVENT_DEL_NODE)
1141                .put(sourceIdBuff)
1142                .put(encodeUnsignedVarint(timeId))
1143                .put(nodeBuff);
1144                
1145                doSend(buff);
1146        }
1147
1148        /*
1149         * (non-Javadoc)
1150         * 
1151         * @see org.graphstream.stream.ElementSink#edgeAdded(java.lang.String, long,
1152         * java.lang.String, java.lang.String, java.lang.String, boolean)
1153         */
1154        public void edgeAdded(String sourceId, long timeId, String edgeId,
1155                        String fromNodeId, String toNodeId, boolean directed) {
1156
1157                if (!sourceId.equals(this.sourceId)) {
1158                        this.sourceId = sourceId;
1159                        sourceIdBuff = encodeString(sourceId);
1160                }
1161                ByteBuffer edgeBuff = encodeString(edgeId);
1162                ByteBuffer fromNodeBuff = encodeString(fromNodeId);
1163                ByteBuffer toNodeBuff = encodeString(toNodeId);
1164                
1165                ByteBuffer buff = ByteBuffer.allocate(
1166                                streamBuffer.capacity() + // stream                                                                                                                                                     
1167                                1 + // CMD
1168                                sourceIdBuff.capacity() + // source id
1169                                varintSize(timeId) + // timeId
1170                                edgeBuff.capacity() + // edge
1171                                fromNodeBuff.capacity() + // from nodeId
1172                                toNodeBuff.capacity() + // to nodeId 
1173                                1 // direction
1174                );
1175                
1176                streamBuffer.rewind();
1177                sourceIdBuff.rewind();
1178                
1179                
1180                buff
1181                .put(streamBuffer)
1182                .put((byte) NetStreamConstants.EVENT_ADD_EDGE)
1183                .put(sourceIdBuff)
1184                .put(encodeUnsignedVarint(timeId))
1185                .put(edgeBuff)
1186                .put(fromNodeBuff)
1187                .put(toNodeBuff)
1188                .put((byte) (!directed ? 0 : 1));
1189                
1190
1191                doSend(buff);
1192
1193        }
1194
1195        /*
1196         * (non-Javadoc)
1197         * 
1198         * @see org.graphstream.stream.ElementSink#edgeRemoved(java.lang.String,
1199         * long, java.lang.String)
1200         */
1201        public void edgeRemoved(String sourceId, long timeId, String edgeId) {
1202
1203                if (!sourceId.equals(this.sourceId)) {
1204                        this.sourceId = sourceId;
1205                        sourceIdBuff = encodeString(sourceId);
1206                }
1207                ByteBuffer edgeBuff = encodeString(edgeId);
1208                
1209                ByteBuffer buff = ByteBuffer.allocate(
1210                                streamBuffer.capacity() + // stream                                                                                                                                                     
1211                                1 + // CMD
1212                                sourceIdBuff.capacity() + // source id
1213                                varintSize(timeId) + // timeId
1214                                edgeBuff.capacity()  // edge
1215                );
1216                
1217                streamBuffer.rewind();
1218                sourceIdBuff.rewind();
1219                
1220                
1221                buff
1222                .put(streamBuffer)
1223                .put((byte) NetStreamConstants.EVENT_DEL_EDGE)
1224                .put(sourceIdBuff)
1225                .put(encodeUnsignedVarint(timeId))
1226                .put(edgeBuff);
1227
1228                doSend(buff);
1229
1230        }
1231
1232        /*
1233         * (non-Javadoc)
1234         * 
1235         * @see org.graphstream.stream.ElementSink#graphCleared(java.lang.String,
1236         * long)
1237         */
1238        public void graphCleared(String sourceId, long timeId) {
1239
1240                if (!sourceId.equals(this.sourceId)) {
1241                        this.sourceId = sourceId;
1242                        sourceIdBuff = encodeString(sourceId);
1243                }
1244                ByteBuffer buff = ByteBuffer.allocate(
1245                                streamBuffer.capacity() + // stream                                                                                                                                                     
1246                                1 + // CMD
1247                                sourceIdBuff.capacity() + // source id
1248                                varintSize(timeId)
1249                );
1250                
1251                streamBuffer.rewind();
1252                sourceIdBuff.rewind();
1253                
1254                
1255                buff
1256                .put(streamBuffer)
1257                .put((byte) NetStreamConstants.EVENT_CLEARED)
1258                .put(sourceIdBuff)
1259                .put(encodeUnsignedVarint(timeId));
1260
1261                doSend(buff);
1262
1263        }
1264
1265        /*
1266         * (non-Javadoc)
1267         * 
1268         * @see org.graphstream.stream.ElementSink#stepBegins(java.lang.String,
1269         * long, double)
1270         */
1271        public void stepBegins(String sourceId, long timeId, double step) {
1272
1273                if (!sourceId.equals(this.sourceId)) {
1274                        this.sourceId = sourceId;
1275                        sourceIdBuff = encodeString(sourceId);
1276                }
1277                
1278                ByteBuffer buff = ByteBuffer.allocate(
1279                                streamBuffer.capacity() + // stream                                                                                                                                                     
1280                                1 + // CMD
1281                                sourceIdBuff.capacity() + // source id
1282                                varintSize(timeId) +
1283                                8 // time
1284                );
1285                
1286                streamBuffer.rewind();
1287                sourceIdBuff.rewind();
1288                                
1289                buff
1290                .put(streamBuffer)
1291                .put((byte) NetStreamConstants.EVENT_STEP)
1292                .put(sourceIdBuff)
1293                .put(encodeUnsignedVarint(timeId))
1294                .putDouble(step);
1295                
1296                
1297                doSend(buff);
1298        }
1299
1300        /**
1301         * Force the connection to close (properly) with the server
1302         * 
1303         * @throws IOException
1304         */
1305        public void close() throws IOException {
1306                socket.close();
1307        }
1308
1309}