001/* 002 * Copyright 2006 - 2013 003 * Stefan Balev <stefan.balev@graphstream-project.org> 004 * Julien Baudry <julien.baudry@graphstream-project.org> 005 * Antoine Dutot <antoine.dutot@graphstream-project.org> 006 * Yoann Pigné <yoann.pigne@graphstream-project.org> 007 * Guilhelm Savin <guilhelm.savin@graphstream-project.org> 008 * 009 * This file is part of GraphStream <http://graphstream-project.org>. 010 * 011 * GraphStream is a library whose purpose is to handle static or dynamic 012 * graph, create them from scratch, file or any source and display them. 013 * 014 * This program is free software distributed under the terms of two licenses, the 015 * CeCILL-C license that fits European law, and the GNU Lesser General Public 016 * License. You can use, modify and/ or redistribute the software under the terms 017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following 018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by 019 * the Free Software Foundation, either version 3 of the License, or (at your 020 * option) any later version. 021 * 022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY 023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 024 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 025 * 026 * You should have received a copy of the GNU Lesser General Public License 027 * along with this program. If not, see <http://www.gnu.org/licenses/>. 028 * 029 * The fact that you are presently reading this means that you have had 030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms. 031 */ 032package org.graphstream.stream.netstream; 033 034import java.io.BufferedOutputStream; 035import java.io.IOException; 036import java.net.Socket; 037import java.net.UnknownHostException; 038import java.nio.ByteBuffer; 039import java.nio.charset.Charset; 040 041import org.graphstream.stream.Sink; 042import org.graphstream.stream.netstream.packing.NetStreamPacker; 043 044/** 045 * <p> 046 * This class implements a sender according to specifications the NetStream 047 * protocol. 048 * </p> 049 * 050 * <p> 051 * See {@link NetStreamConstants} for a full description of the protocol, the 052 * sender and the receiver. 053 * </p> 054 * 055 * @see NetStreamConstants 056 * @see NetStreamReceiver 057 * 058 * 059 * Copyright (c) 2010 University of Luxembourg 060 * 061 * NetStreamSender.java 062 * @since Aug 10, 2011 063 * 064 * @author Yoann Pigné 065 * 066 */ 067public class NetStreamSender implements Sink { 068 private static ByteBuffer NULL_BUFFER = ByteBuffer.allocate(0); 069 070 protected String stream; 071 protected ByteBuffer streamBuffer; 072 byte[] streamIdArray; 073 protected String host; 074 protected int port; 075 protected Socket socket; 076 protected BufferedOutputStream out; 077 078 protected String sourceId = ""; 079 protected ByteBuffer sourceIdBuff; 080 081 class DefaultPacker extends NetStreamPacker { 082 ByteBuffer sizeBuffer = ByteBuffer.allocate(4); 083 084 @Override 085 public ByteBuffer packMessage(ByteBuffer buffer, int startIndex, 086 int endIndex) { 087 return buffer; 088 } 089 090 @Override 091 public ByteBuffer packMessageSize(int capacity) { 092 sizeBuffer.rewind(); 093 sizeBuffer.putInt(capacity); 094 return sizeBuffer; 095 } 096 097 }; 098 099 protected NetStreamPacker packer = new DefaultPacker(); 100 101 public NetStreamSender(String host, int port) throws UnknownHostException, 102 IOException { 103 this("default", host, port); 104 } 105 public NetStreamSender(int port) throws UnknownHostException, IOException { 106 this("default", "localhost", port); 107 } 108 109 public NetStreamSender(String stream, String host, int port) 110 throws UnknownHostException, IOException { 111 this.stream = stream; 112 this.host = host; 113 this.port = port; 114 setStream(stream); 115 116 connect(); 117 118 } 119 120 /** 121 * @param stream2 122 */ 123 public void setStream(String stream) { 124 streamIdArray = stream.getBytes(Charset.forName("UTF-8")); 125 streamBuffer = encodeString(stream); 126 127 128 } 129 public NetStreamSender(Socket socket) throws IOException { 130 this("default", socket); 131 } 132 133 public NetStreamSender(String stream, Socket socket) throws IOException { 134 this.host = socket.getInetAddress().getHostName(); 135 this.port = socket.getPort(); 136 this.socket = socket; 137 this.out = new BufferedOutputStream(socket.getOutputStream()); 138 this.streamIdArray = stream.getBytes(Charset.forName("UTF-8")); 139 } 140 141 /** 142 * Sets an optional NetStreamPaker whose "pack" method will be called on 143 * each message. 144 * 145 * a Packer can do extra encoding on the all byte array message, it may also 146 * crypt things. 147 * 148 * @param paker 149 * The packer object 150 */ 151 public void setPacker(NetStreamPacker paker) { 152 this.packer = paker; 153 } 154 public void removePacker() { 155 packer = new DefaultPacker(); 156 } 157 158 protected void connect() throws UnknownHostException, IOException { 159 160 socket = new Socket(host, port); 161 out = new BufferedOutputStream(socket.getOutputStream()); 162 163 } 164 165 protected int getType(Object value) { 166 int valueType = NetStreamConstants.TYPE_UNKNOWN; 167 168 if (value == null) 169 return NetStreamConstants.TYPE_NULL; 170 171 Class<?> valueClass = value.getClass(); 172 boolean isArray = valueClass.isArray(); 173 if (isArray) { 174 valueClass = ((Object[]) value)[0].getClass(); 175 } 176 if (valueClass.equals(Boolean.class)) { 177 if (isArray) { 178 valueType = NetStreamConstants.TYPE_BOOLEAN_ARRAY; 179 } else { 180 valueType = NetStreamConstants.TYPE_BOOLEAN; 181 } 182 } else if (valueClass.equals(Byte.class)) { 183 if (isArray) { 184 valueType = NetStreamConstants.TYPE_BYTE_ARRAY; 185 } else { 186 valueType = NetStreamConstants.TYPE_BYTE; 187 } 188 } else if (valueClass.equals(Short.class)) { 189 if (isArray) { 190 valueType = NetStreamConstants.TYPE_SHORT_ARRAY; 191 } else { 192 valueType = NetStreamConstants.TYPE_SHORT; 193 } 194 } else if (valueClass.equals(Integer.class)) { 195 if (isArray) { 196 valueType = NetStreamConstants.TYPE_INT_ARRAY; 197 } else { 198 valueType = NetStreamConstants.TYPE_INT; 199 } 200 } else if (valueClass.equals(Long.class)) { 201 if (isArray) { 202 valueType = NetStreamConstants.TYPE_LONG_ARRAY; 203 } else { 204 valueType = NetStreamConstants.TYPE_LONG; 205 } 206 } else if (valueClass.equals(Float.class)) { 207 if (isArray) { 208 valueType = NetStreamConstants.TYPE_FLOAT_ARRAY; 209 } else { 210 valueType = NetStreamConstants.TYPE_FLOAT; 211 } 212 } else if (valueClass.equals(Double.class)) { 213 if (isArray) { 214 valueType = NetStreamConstants.TYPE_DOUBLE_ARRAY; 215 } else { 216 valueType = NetStreamConstants.TYPE_DOUBLE; 217 } 218 } else if (valueClass.equals(String.class)) { 219 if (isArray) { 220 valueType = NetStreamConstants.TYPE_ARRAY; 221 } else { 222 valueType = NetStreamConstants.TYPE_STRING; 223 } 224 } else 225 System.err.printf("[warning] can not find type of %s\n", valueClass); 226 // System.out.println("ValueType="+valueType+" "+value.getClass()); 227 return valueType; 228 } 229 230 protected ByteBuffer encodeValue(Object in, int valueType) { 231 232 if (NetStreamConstants.TYPE_BOOLEAN == valueType) { 233 return encodeBoolean(in); 234 } else if (NetStreamConstants.TYPE_BOOLEAN_ARRAY == valueType) { 235 return encodeBooleanArray(in); 236 } else if (NetStreamConstants.TYPE_BYTE == valueType) { 237 return encodeByte(in); 238 } else if (NetStreamConstants.TYPE_BYTE_ARRAY == valueType) { 239 return encodeByteArray(in); 240 } else if (NetStreamConstants.TYPE_SHORT == valueType) { 241 return encodeShort(in); 242 } else if (NetStreamConstants.TYPE_SHORT_ARRAY == valueType) { 243 return encodeShortArray(in); 244 } else if (NetStreamConstants.TYPE_INT == valueType) { 245 return encodeInt(in); 246 } else if (NetStreamConstants.TYPE_INT_ARRAY == valueType) { 247 return encodeIntArray(in); 248 } else if (NetStreamConstants.TYPE_LONG == valueType) { 249 return encodeLong(in); 250 } else if (NetStreamConstants.TYPE_LONG_ARRAY == valueType) { 251 return encodeLongArray(in); 252 } else if (NetStreamConstants.TYPE_FLOAT == valueType) { 253 return encodeFloat(in); 254 } else if (NetStreamConstants.TYPE_FLOAT_ARRAY == valueType) { 255 return encodeFloatArray(in); 256 } else if (NetStreamConstants.TYPE_DOUBLE == valueType) { 257 return encodeDouble(in); 258 } else if (NetStreamConstants.TYPE_DOUBLE_ARRAY == valueType) { 259 return encodeDoubleArray(in); 260 } else if (NetStreamConstants.TYPE_STRING == valueType) { 261 return encodeString(in); 262 } else if (NetStreamConstants.TYPE_ARRAY == valueType) { 263 return encodeArray(in); 264 } else if (NetStreamConstants.TYPE_NULL == valueType) { 265 return NULL_BUFFER; 266 } 267 268 System.err.printf("[warning] unknown value type %d\n", valueType); 269 270 return null; 271 } 272 273 /** 274 * @param in 275 * @return 276 */ 277 protected ByteBuffer encodeArray(Object in) { 278 // TODO... 279 return null; 280 } 281 282 /* 283 private void outBuffer(ByteBuffer buf){ 284 System.out.println(buf.toString()); 285 int nbytes = buf.capacity(); 286 int at = buf.position(); 287 for(int i=0; i< nbytes; i++){ 288 int bt = buf.get(at+i); 289 if (bt < 0) bt = (bt & 127) + (bt & 128); 290 System.out.printf("%d ", bt); 291 } 292 System.out.println(); 293 }*/ 294 295 /** 296 * @param in 297 * @return 298 */ 299 protected ByteBuffer encodeString(Object in) { 300 //System.out.println("They want me to encode this string: "+in); 301 String s = (String) in; 302 byte[] data = s.getBytes(Charset.forName("UTF-8")); 303 304 ByteBuffer lenBuff = encodeUnsignedVarint(data.length); 305 //outBuffer(lenBuff); 306 ByteBuffer bb = ByteBuffer.allocate(lenBuff.capacity() + data.length); 307 bb.put(lenBuff).put(data); 308 bb.rewind(); 309 //outBuffer(bb); 310 311 return bb; 312 } 313 314 /** 315 * @param in 316 * @return 317 */ 318 protected ByteBuffer encodeDoubleArray(Object in) { 319 Object[] data = (Object[]) in; 320 321 int ssize = varintSize(data.length); 322 323 ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 8); 324 325 putVarint(b, data.length, ssize); 326 327 for (int i = 0; i < data.length; i++) { 328 b.putDouble((Double) data[i]); 329 } 330 b.rewind(); 331 return b; 332 } 333 334 /** 335 * @param in The double to encode 336 * @return ByteBuffer with encoded double in it 337 */ 338 protected ByteBuffer encodeDouble(Object in) { 339 ByteBuffer bb = ByteBuffer.allocate(8).putDouble((Double) in); 340 bb.rewind(); 341 return bb; 342 } 343 344 /** 345 * @param in The float array to encode 346 * @return ByteBuffer with encoded float array in it 347 */ 348 protected ByteBuffer encodeFloatArray(Object in) { 349 Object[] data = (Object[]) in; 350 351 int ssize = varintSize(data.length); 352 353 ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 4); 354 355 putVarint(b, data.length, ssize); 356 357 for (int i = 0; i < data.length; i++) { 358 b.putFloat((Float) data[i]); 359 } 360 b.rewind(); 361 return b; 362 } 363 364 /** 365 * @param in The float to encode 366 * @return ByteBuffer with encoded float in it 367 */ 368 protected ByteBuffer encodeFloat(Object in) { 369 ByteBuffer b = ByteBuffer.allocate(4); 370 b.putFloat(((Float) in)); 371 b.rewind(); 372 return b; 373 } 374 375 /** 376 * @param in The long array to encode 377 * @return ByteBuffer with encoded long array in it 378 */ 379 protected ByteBuffer encodeLongArray(Object in) { 380 return encodeVarintArray(in); 381 } 382 383 /** 384 * @param in The long to encode 385 * @return ByteBuffer with encoded long in it 386 */ 387 protected ByteBuffer encodeLong(Object in) { 388 return encodeVarint(in); 389 } 390 391 /** 392 * @param in The integer array to encode 393 * @return ByteBuffer with encoded integer array in it 394 */ 395 protected ByteBuffer encodeIntArray(Object in) { 396 return encodeVarintArray(in); 397 } 398 399 /** 400 * @param in The integer to encode 401 * @return ByteBuffer with encoded integer in it 402 */ 403 protected ByteBuffer encodeInt(Object in) { 404 return encodeVarint(in); 405 } 406 407 /** 408 * @param in 409 * @return 410 */ 411 protected ByteBuffer encodeShortArray(Object in) { 412 return encodeVarintArray(in); 413 } 414 415 /** 416 * @param in 417 * @return 418 */ 419 protected ByteBuffer encodeShort(Object in) { 420 return encodeVarint(in); 421 } 422 423 /** 424 * @param in 425 * @return 426 */ 427 protected ByteBuffer encodeByteArray(Object in) { 428 Object[] data = (Object[]) in; 429 430 int ssize = varintSize(data.length); 431 432 ByteBuffer b = ByteBuffer.allocate(ssize + data.length); 433 434 putVarint(b, data.length, ssize); 435 436 for (int i = 0; i < data.length; i++) { 437 b.put((Byte) data[i]); 438 } 439 b.rewind(); 440 return b; 441 } 442 443 /** 444 * @param in 445 * @return 446 */ 447 protected ByteBuffer encodeByte(Object in) { 448 ByteBuffer b = ByteBuffer.allocate(1); 449 b.put(((Byte) in)); 450 b.rewind(); 451 return b; 452 } 453 454 /** 455 * @param in 456 * @return 457 */ 458 protected ByteBuffer encodeBooleanArray(Object in) { 459 Object[] data = (Object[]) in; 460 461 int ssize = varintSize(data.length); 462 463 ByteBuffer b = ByteBuffer.allocate(ssize + data.length); 464 465 putVarint(b, data.length, ssize); 466 467 for (int i = 0; i < data.length; i++) { 468 b.put((byte) ((Boolean) data[i] == false ? 0 : 1)); 469 } 470 b.rewind(); 471 return b; 472 } 473 474 /** 475 * @param in 476 * @return 477 */ 478 protected ByteBuffer encodeBoolean(Object in) { 479 ByteBuffer b = ByteBuffer.allocate(1); 480 b.put((byte) (((Boolean) in) == false ? 0 : 1)); 481 b.rewind(); 482 return b; 483 } 484 485 private int varintSize(long data){ 486 487 // 7 bits -> 127 488 if(data < (1L << 7)){ 489 return 1; 490 } 491 492 // 14 bits -> 16383 493 if(data < (1L << 14)){ 494 return 2; 495 } 496 497 // 21 bits -> 2097151 498 if(data < (1L << 21)){ 499 return 3; 500 } 501 502 // 28 bits -> 268435455 503 if(data < (1L << 28)){ 504 return 4; 505 } 506 507 // 35 bits -> 34359738367 508 if(data < (1L << 35)){ 509 return 5; 510 } 511 512 // 42 bits -> 4398046511103 513 if(data < (1L << 42)){ 514 return 6; 515 } 516 517 // 49 bits -> 562949953421311 518 if(data < (1L << 49)){ 519 return 7; 520 } 521 522 // 56 bits -> 72057594037927935 523 if(data < (1L << 56)){ 524 return 8; 525 } 526 527 return 9; 528 } 529 /** 530 * @param in 531 * @return 532 */ 533 protected ByteBuffer encodeVarint(Object in) { 534 long data = ((Number)in).longValue(); 535 536 // signed integers encoding 537 // (n << 1) ^ (n >> 31) 538 // OK but java's negative values are two's complements... 539 540 return encodeUnsignedVarint(data>=0?(data<<1):((Math.abs(data) << 1) ^ 1)); 541 } 542 543 /** 544 * @param in 545 * @return 546 */ 547 protected ByteBuffer encodeVarintArray(Object in) { 548 Object[] data = (Object[]) in; 549 int[] sizes = new int[data.length]; 550 long[] zigzags = new long[data.length]; 551 int sumsizes=0; 552 for (int i = 0; i < data.length; i++) { 553 long datum = ((Number)data[i]).longValue(); 554 // signed integers encoding 555 // (n << 1) ^ (n >> 31) 556 // OK but java's negative values are two's complements... 557 zigzags[i] = datum>0?(datum<<1):((Math.abs(datum) << 1) ^ 1); 558 559 sizes[i] = varintSize(zigzags[i]); 560 sumsizes+=sizes[i]; 561 //System.out.printf("i=%d, zigzag=%d, size=%d\n",i, zigzags[i], sizes[i]); 562 } 563 564 // the size of the size! 565 int ssize = varintSize(data.length); 566 567 ByteBuffer b = ByteBuffer.allocate(ssize + sumsizes); 568 569 putVarint(b, data.length, ssize); 570 571 for (int i = 0; i < data.length; i++) { 572 putVarint(b, zigzags[i], sizes[i]); 573 } 574 b.rewind(); 575 //outBuffer(b); 576 return b; 577 } 578 579 /** 580 * @param in 581 * @return 582 */ 583 protected ByteBuffer encodeUnsignedVarint(Object in) { 584 long data = ((Number)in).longValue(); 585 586 int size = varintSize(data); 587 588 ByteBuffer buff = ByteBuffer.allocate(size); 589 for(int i = 0; i < size; i++){ 590 int head=128; 591 if(i==size-1) head = 0; 592 long b = ((data >> (7*i)) & 127) ^ head; 593 buff.put((byte)(b & 255 )); 594 } 595 buff.rewind(); 596 return buff; 597 } 598 599 600 /** 601 * @param b 602 * @param sumsizes 603 * @param ssize 604 */ 605 private void putVarint(ByteBuffer buffer, long number, int byteSize) { 606 for(int i = 0; i < byteSize; i++){ 607 int head=128; 608 if(i==byteSize-1) head = 0; 609 long b = ((number >> (7*i)) & 127) ^ head; 610 buffer.put((byte)(b & 255 )); 611 } 612 } 613 614 /** 615 * @param buff 616 */ 617 private void doSend(ByteBuffer buff) { 618 619 if (socket.isClosed()) { 620 System.err 621 .println("NetStreamSender : can't send. The socket is closed."); 622 } else { 623 buff.rewind(); 624 //outBuffer(buff); 625 ByteBuffer buffer = packer.packMessage(buff); 626 ByteBuffer sizeBuffer = packer.packMessageSize(buffer.capacity()); 627 628 // real sending 629 try { 630 out.write(sizeBuffer.array(), 0, sizeBuffer.capacity()); 631 out.write(buffer.array(), 0, buffer.capacity()); 632 out.flush(); 633 } catch (IOException e) { 634 try { 635 socket.close(); 636 } catch (IOException e1) { 637 e1.printStackTrace(); 638 } 639 640 System.err.printf("socket error : %s\n", e.getMessage()); 641 } 642 } 643 } 644 645 /* 646 * (non-Javadoc) 647 * 648 * @see 649 * org.graphstream.stream.AttributeSink#graphAttributeAdded(java.lang.String 650 * , long, java.lang.String, java.lang.Object) 651 */ 652 public void graphAttributeAdded(String sourceId, long timeId, 653 String attribute, Object value) { 654 655 if (!sourceId.equals(this.sourceId)) { 656 this.sourceId = sourceId; 657 sourceIdBuff = encodeString(sourceId); 658 659 } 660 ByteBuffer attrBuff = encodeString(attribute); 661 int valueType = getType(value); 662 ByteBuffer valueBuff = encodeValue(value, valueType); 663 ByteBuffer buff = ByteBuffer.allocate( 664 streamBuffer.capacity() + // stream 665 1 + // CMD 666 sourceIdBuff.capacity() + // source id 667 varintSize(timeId) + // timeId 668 attrBuff.capacity() + // attribute id 669 1 + // attr type 670 valueBuff.capacity()); // attr value 671 672 streamBuffer.rewind(); 673 sourceIdBuff.rewind(); 674 buff 675 .put(streamBuffer) 676 .put((byte) NetStreamConstants.EVENT_ADD_GRAPH_ATTR) 677 .put(sourceIdBuff) 678 .put(encodeUnsignedVarint(timeId)) 679 .put(attrBuff) 680 .put((byte) valueType) 681 .put(valueBuff); 682 683 doSend(buff); 684 685 } 686 687 /* 688 * (non-Javadoc) 689 * 690 * @see 691 * org.graphstream.stream.AttributeSink#graphAttributeChanged(java.lang. 692 * String, long, java.lang.String, java.lang.Object, java.lang.Object) 693 */ 694 public void graphAttributeChanged(String sourceId, long timeId, 695 String attribute, Object oldValue, Object newValue) { 696 697 if (!sourceId.equals(this.sourceId)) { 698 this.sourceId = sourceId; 699 sourceIdBuff = encodeString(sourceId); 700 } 701 ByteBuffer attrBuff = encodeString(attribute); 702 int oldValueType = getType(oldValue); 703 int newValueType = getType(newValue); 704 705 ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType); 706 ByteBuffer newValueBuff = encodeValue(newValue, newValueType); 707 708 709 ByteBuffer buff = ByteBuffer.allocate( 710 streamBuffer.capacity() + // stream 711 1 + // CMD 712 sourceIdBuff.capacity() + // source id 713 varintSize(timeId) + // timeId 714 attrBuff.capacity() + // attribute id 715 1 + // attr type 716 oldValueBuff.capacity() + // attr value 717 1 + // attr type 718 newValueBuff.capacity()); // attr value 719 720 streamBuffer.rewind(); 721 sourceIdBuff.rewind(); 722 723 buff 724 .put(streamBuffer) 725 .put((byte) NetStreamConstants.EVENT_CHG_GRAPH_ATTR) 726 .put(sourceIdBuff) 727 .put(encodeUnsignedVarint(timeId)) 728 .put(attrBuff) 729 .put((byte) oldValueType) 730 .put(oldValueBuff) 731 .put((byte) newValueType) 732 .put(newValueBuff); 733 734 doSend(buff); 735 736 } 737 738 /* 739 * (non-Javadoc) 740 * 741 * @see 742 * org.graphstream.stream.AttributeSink#graphAttributeRemoved(java.lang. 743 * String, long, java.lang.String) 744 */ 745 public void graphAttributeRemoved(String sourceId, long timeId, 746 String attribute) { 747 748 if (!sourceId.equals(this.sourceId)) { 749 this.sourceId = sourceId; 750 sourceIdBuff = encodeString(sourceId); 751 } 752 ByteBuffer attrBuff = encodeString(attribute); 753 754 ByteBuffer buff = ByteBuffer.allocate( 755 streamBuffer.capacity() + // stream 756 1 + // CMD 757 sourceIdBuff.capacity() + // source id 758 varintSize(timeId) + // timeId 759 attrBuff.capacity() 760 ); // attribute id 761 762 streamBuffer.rewind(); 763 sourceIdBuff.rewind(); 764 765 buff 766 .put(streamBuffer) 767 .put((byte) NetStreamConstants.EVENT_DEL_GRAPH_ATTR) 768 .put(sourceIdBuff) 769 .put(encodeUnsignedVarint(timeId)) 770 .put(attrBuff); 771 772 doSend(buff); 773 774 } 775 776 /* 777 * (non-Javadoc) 778 * 779 * @see 780 * org.graphstream.stream.AttributeSink#nodeAttributeAdded(java.lang.String, 781 * long, java.lang.String, java.lang.String, java.lang.Object) 782 */ 783 public void nodeAttributeAdded(String sourceId, long timeId, String nodeId, 784 String attribute, Object value) { 785 786 if (!sourceId.equals(this.sourceId)) { 787 this.sourceId = sourceId; 788 sourceIdBuff = encodeString(sourceId); 789 } 790 ByteBuffer nodeBuff = encodeString(nodeId); 791 ByteBuffer attrBuff = encodeString(attribute); 792 int valueType = getType(value); 793 ByteBuffer valueBuff = encodeValue(value, valueType); 794 795 ByteBuffer buff = ByteBuffer.allocate( 796 streamBuffer.capacity() + // stream 797 1 + // CMD 798 sourceIdBuff.capacity() + // source id 799 varintSize(timeId) + // timeId 800 nodeBuff.capacity() + // nodeId 801 attrBuff.capacity() + // attribute 802 1 + // value type 803 valueBuff.capacity() // value 804 ); 805 806 streamBuffer.rewind(); 807 sourceIdBuff.rewind(); 808 809 810 buff 811 .put(streamBuffer) 812 .put((byte) NetStreamConstants.EVENT_ADD_NODE_ATTR) 813 .put(sourceIdBuff) 814 .put(encodeUnsignedVarint(timeId)) 815 .put(nodeBuff) 816 .put(attrBuff) 817 .put((byte) valueType) 818 .put(valueBuff); 819 820 821 doSend(buff); 822 823 } 824 825 /* 826 * (non-Javadoc) 827 * 828 * @see 829 * org.graphstream.stream.AttributeSink#nodeAttributeChanged(java.lang.String 830 * , long, java.lang.String, java.lang.String, java.lang.Object, 831 * java.lang.Object) 832 */ 833 public void nodeAttributeChanged(String sourceId, long timeId, 834 String nodeId, String attribute, Object oldValue, Object newValue) { 835 if (!sourceId.equals(this.sourceId)) { 836 this.sourceId = sourceId; 837 sourceIdBuff = encodeString(sourceId); 838 } 839 840 ByteBuffer nodeBuff = encodeString(nodeId); 841 ByteBuffer attrBuff = encodeString(attribute); 842 843 int oldValueType = getType(oldValue); 844 int newValueType = getType(newValue); 845 846 ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType); 847 ByteBuffer newValueBuff = encodeValue(newValue, newValueType); 848 849 ByteBuffer buff = ByteBuffer.allocate( 850 streamBuffer.capacity() + // stream 851 1 + // CMD 852 sourceIdBuff.capacity() + // source id 853 varintSize(timeId) + // timeId 854 nodeBuff.capacity() + // nodeId 855 attrBuff.capacity() + // attribute 856 1 + // value type 857 oldValueBuff.capacity() + // value 858 1 + // value type 859 newValueBuff.capacity() // value 860 ); 861 862 streamBuffer.rewind(); 863 sourceIdBuff.rewind(); 864 865 866 buff 867 .put(streamBuffer) 868 .put((byte) NetStreamConstants.EVENT_CHG_NODE_ATTR) 869 .put(sourceIdBuff) 870 .put(encodeUnsignedVarint(timeId)) 871 .put(nodeBuff) 872 .put(attrBuff) 873 .put((byte) oldValueType) 874 .put(oldValueBuff) 875 .put((byte) newValueType) 876 .put(newValueBuff); 877 878 doSend(buff); 879 } 880 881 /* 882 * (non-Javadoc) 883 * 884 * @see 885 * org.graphstream.stream.AttributeSink#nodeAttributeRemoved(java.lang.String 886 * , long, java.lang.String, java.lang.String) 887 */ 888 public void nodeAttributeRemoved(String sourceId, long timeId, 889 String nodeId, String attribute) { 890 891 if (!sourceId.equals(this.sourceId)) { 892 this.sourceId = sourceId; 893 sourceIdBuff = encodeString(sourceId); 894 } 895 ByteBuffer nodeBuff = encodeString(nodeId); 896 ByteBuffer attrBuff = encodeString(attribute); 897 898 ByteBuffer buff = ByteBuffer.allocate( 899 streamBuffer.capacity() + // stream 900 1 + // CMD 901 sourceIdBuff.capacity() + // source id 902 varintSize(timeId) + // timeId 903 nodeBuff.capacity() + // nodeId 904 attrBuff.capacity() // attribute 905 ); 906 907 908 streamBuffer.rewind(); 909 sourceIdBuff.rewind(); 910 911 912 buff 913 .put(streamBuffer) 914 .put((byte) NetStreamConstants.EVENT_DEL_NODE_ATTR) 915 .put(sourceIdBuff) 916 .put(encodeUnsignedVarint(timeId)) 917 .put(nodeBuff) 918 .put(attrBuff); 919 920 doSend(buff); 921 922 } 923 924 /* 925 * (non-Javadoc) 926 * 927 * @see 928 * org.graphstream.stream.AttributeSink#edgeAttributeAdded(java.lang.String, 929 * long, java.lang.String, java.lang.String, java.lang.Object) 930 */ 931 public void edgeAttributeAdded(String sourceId, long timeId, String edgeId, 932 String attribute, Object value) { 933 934 if (!sourceId.equals(this.sourceId)) { 935 this.sourceId = sourceId; 936 sourceIdBuff = encodeString(sourceId); 937 } 938 ByteBuffer edgeBuff = encodeString(edgeId); 939 ByteBuffer attrBuff = encodeString(attribute); 940 941 int valueType = getType(value); 942 943 ByteBuffer valueBuff = encodeValue(value, valueType); 944 945 ByteBuffer buff = ByteBuffer.allocate( 946 streamBuffer.capacity() + // stream 947 1 + // CMD 948 sourceIdBuff.capacity() + // source id 949 varintSize(timeId) + // timeId 950 edgeBuff.capacity() + // nodeId 951 attrBuff.capacity() + // attribute 952 1 + // value type 953 valueBuff.capacity() // value 954 ); 955 956 streamBuffer.rewind(); 957 sourceIdBuff.rewind(); 958 959 960 buff 961 .put(streamBuffer) 962 .put((byte) NetStreamConstants.EVENT_ADD_EDGE_ATTR) 963 .put(sourceIdBuff) 964 .put(encodeUnsignedVarint(timeId)) 965 .put(edgeBuff) 966 .put(attrBuff) 967 .put((byte) valueType) // value type 968 .put(valueBuff); 969 970 doSend(buff); 971 } 972 973 /* 974 * (non-Javadoc) 975 * 976 * @see 977 * org.graphstream.stream.AttributeSink#edgeAttributeChanged(java.lang.String 978 * , long, java.lang.String, java.lang.String, java.lang.Object, 979 * java.lang.Object) 980 */ 981 public void edgeAttributeChanged(String sourceId, long timeId, 982 String edgeId, String attribute, Object oldValue, Object newValue) { 983 984 if (!sourceId.equals(this.sourceId)) { 985 this.sourceId = sourceId; 986 sourceIdBuff = encodeString(sourceId); 987 } 988 ByteBuffer edgeBuff = encodeString(edgeId); 989 ByteBuffer attrBuff = encodeString(attribute); 990 int oldValueType = getType(oldValue); 991 int newValueType = getType(newValue); 992 993 ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType); 994 ByteBuffer newValueBuff = encodeValue(newValue, newValueType); 995 996 ByteBuffer buff = ByteBuffer.allocate( 997 streamBuffer.capacity() + // stream 998 1 + // CMD 999 sourceIdBuff.capacity() + // source id 1000 varintSize(timeId) + // timeId 1001 edgeBuff.capacity() + // nodeId 1002 attrBuff.capacity() + // attribute 1003 1 + // value type 1004 oldValueBuff.capacity() + // value 1005 1 + // value type 1006 newValueBuff.capacity() // value 1007 ); 1008 1009 1010 streamBuffer.rewind(); 1011 sourceIdBuff.rewind(); 1012 1013 1014 buff 1015 .put(streamBuffer) 1016 .put((byte) NetStreamConstants.EVENT_CHG_EDGE_ATTR) 1017 .put(sourceIdBuff) 1018 .put(encodeUnsignedVarint(timeId)) 1019 .put(edgeBuff) 1020 .put(attrBuff) 1021 .put((byte) oldValueType) 1022 .put(oldValueBuff) 1023 .put((byte) newValueType) 1024 .put(newValueBuff); 1025 1026 doSend(buff); 1027 1028 } 1029 1030 /* 1031 * (non-Javadoc) 1032 * 1033 * @see 1034 * org.graphstream.stream.AttributeSink#edgeAttributeRemoved(java.lang.String 1035 * , long, java.lang.String, java.lang.String) 1036 */ 1037 public void edgeAttributeRemoved(String sourceId, long timeId, 1038 String edgeId, String attribute) { 1039 1040 if (!sourceId.equals(this.sourceId)) { 1041 this.sourceId = sourceId; 1042 sourceIdBuff = encodeString(sourceId); 1043 } 1044 ByteBuffer edgeBuff = encodeString(edgeId); 1045 ByteBuffer attrBuff = encodeString(attribute); 1046 1047 ByteBuffer buff = ByteBuffer.allocate( 1048 streamBuffer.capacity() + // stream 1049 1 + // CMD 1050 sourceIdBuff.capacity() + // source id 1051 varintSize(timeId) + // timeId 1052 edgeBuff.capacity() + // nodeId 1053 attrBuff.capacity() // attribute 1054 ); 1055 1056 1057 streamBuffer.rewind(); 1058 sourceIdBuff.rewind(); 1059 1060 1061 buff 1062 .put(streamBuffer) 1063 .put((byte) NetStreamConstants.EVENT_DEL_EDGE_ATTR) 1064 .put(sourceIdBuff) 1065 .put(encodeUnsignedVarint(timeId)) 1066 .put(edgeBuff) 1067 .put(attrBuff); 1068 1069 1070 doSend(buff); 1071 1072 } 1073 1074 /* 1075 * (non-Javadoc) 1076 * 1077 * @see org.graphstream.stream.ElementSink#nodeAdded(java.lang.String, long, 1078 * java.lang.String) 1079 */ 1080 public void nodeAdded(String sourceId, long timeId, String nodeId) { 1081 1082 if (!sourceId.equals(this.sourceId)) { 1083 this.sourceId = sourceId; 1084 sourceIdBuff = encodeString(sourceId); 1085 } 1086 ByteBuffer nodeBuff = encodeString(nodeId); 1087 1088 1089 ByteBuffer buff = ByteBuffer.allocate( 1090 streamBuffer.capacity() + // stream 1091 1 + // CMD 1092 sourceIdBuff.capacity() + // source id 1093 varintSize(timeId) + // timeId 1094 nodeBuff.capacity() // nodeId 1095 ); 1096 1097 streamBuffer.rewind(); 1098 sourceIdBuff.rewind(); 1099 1100 1101 buff 1102 .put(streamBuffer) 1103 .put((byte) NetStreamConstants.EVENT_ADD_NODE) 1104 .put(sourceIdBuff) 1105 .put(encodeUnsignedVarint(timeId)) 1106 .put(nodeBuff); 1107 1108 1109 doSend(buff); 1110 1111 } 1112 1113 /* 1114 * (non-Javadoc) 1115 * 1116 * @see org.graphstream.stream.ElementSink#nodeRemoved(java.lang.String, 1117 * long, java.lang.String) 1118 */ 1119 public void nodeRemoved(String sourceId, long timeId, String nodeId) { 1120 if (!sourceId.equals(this.sourceId)) { 1121 this.sourceId = sourceId; 1122 sourceIdBuff = encodeString(sourceId); 1123 } 1124 ByteBuffer nodeBuff = encodeString(nodeId); 1125 1126 ByteBuffer buff = ByteBuffer.allocate( 1127 streamBuffer.capacity() + // stream 1128 1 + // CMD 1129 sourceIdBuff.capacity() + // source id 1130 varintSize(timeId) + // timeId 1131 nodeBuff.capacity() // nodeId 1132 ); 1133 1134 streamBuffer.rewind(); 1135 sourceIdBuff.rewind(); 1136 1137 1138 buff 1139 .put(streamBuffer) 1140 .put((byte) NetStreamConstants.EVENT_DEL_NODE) 1141 .put(sourceIdBuff) 1142 .put(encodeUnsignedVarint(timeId)) 1143 .put(nodeBuff); 1144 1145 doSend(buff); 1146 } 1147 1148 /* 1149 * (non-Javadoc) 1150 * 1151 * @see org.graphstream.stream.ElementSink#edgeAdded(java.lang.String, long, 1152 * java.lang.String, java.lang.String, java.lang.String, boolean) 1153 */ 1154 public void edgeAdded(String sourceId, long timeId, String edgeId, 1155 String fromNodeId, String toNodeId, boolean directed) { 1156 1157 if (!sourceId.equals(this.sourceId)) { 1158 this.sourceId = sourceId; 1159 sourceIdBuff = encodeString(sourceId); 1160 } 1161 ByteBuffer edgeBuff = encodeString(edgeId); 1162 ByteBuffer fromNodeBuff = encodeString(fromNodeId); 1163 ByteBuffer toNodeBuff = encodeString(toNodeId); 1164 1165 ByteBuffer buff = ByteBuffer.allocate( 1166 streamBuffer.capacity() + // stream 1167 1 + // CMD 1168 sourceIdBuff.capacity() + // source id 1169 varintSize(timeId) + // timeId 1170 edgeBuff.capacity() + // edge 1171 fromNodeBuff.capacity() + // from nodeId 1172 toNodeBuff.capacity() + // to nodeId 1173 1 // direction 1174 ); 1175 1176 streamBuffer.rewind(); 1177 sourceIdBuff.rewind(); 1178 1179 1180 buff 1181 .put(streamBuffer) 1182 .put((byte) NetStreamConstants.EVENT_ADD_EDGE) 1183 .put(sourceIdBuff) 1184 .put(encodeUnsignedVarint(timeId)) 1185 .put(edgeBuff) 1186 .put(fromNodeBuff) 1187 .put(toNodeBuff) 1188 .put((byte) (!directed ? 0 : 1)); 1189 1190 1191 doSend(buff); 1192 1193 } 1194 1195 /* 1196 * (non-Javadoc) 1197 * 1198 * @see org.graphstream.stream.ElementSink#edgeRemoved(java.lang.String, 1199 * long, java.lang.String) 1200 */ 1201 public void edgeRemoved(String sourceId, long timeId, String edgeId) { 1202 1203 if (!sourceId.equals(this.sourceId)) { 1204 this.sourceId = sourceId; 1205 sourceIdBuff = encodeString(sourceId); 1206 } 1207 ByteBuffer edgeBuff = encodeString(edgeId); 1208 1209 ByteBuffer buff = ByteBuffer.allocate( 1210 streamBuffer.capacity() + // stream 1211 1 + // CMD 1212 sourceIdBuff.capacity() + // source id 1213 varintSize(timeId) + // timeId 1214 edgeBuff.capacity() // edge 1215 ); 1216 1217 streamBuffer.rewind(); 1218 sourceIdBuff.rewind(); 1219 1220 1221 buff 1222 .put(streamBuffer) 1223 .put((byte) NetStreamConstants.EVENT_DEL_EDGE) 1224 .put(sourceIdBuff) 1225 .put(encodeUnsignedVarint(timeId)) 1226 .put(edgeBuff); 1227 1228 doSend(buff); 1229 1230 } 1231 1232 /* 1233 * (non-Javadoc) 1234 * 1235 * @see org.graphstream.stream.ElementSink#graphCleared(java.lang.String, 1236 * long) 1237 */ 1238 public void graphCleared(String sourceId, long timeId) { 1239 1240 if (!sourceId.equals(this.sourceId)) { 1241 this.sourceId = sourceId; 1242 sourceIdBuff = encodeString(sourceId); 1243 } 1244 ByteBuffer buff = ByteBuffer.allocate( 1245 streamBuffer.capacity() + // stream 1246 1 + // CMD 1247 sourceIdBuff.capacity() + // source id 1248 varintSize(timeId) 1249 ); 1250 1251 streamBuffer.rewind(); 1252 sourceIdBuff.rewind(); 1253 1254 1255 buff 1256 .put(streamBuffer) 1257 .put((byte) NetStreamConstants.EVENT_CLEARED) 1258 .put(sourceIdBuff) 1259 .put(encodeUnsignedVarint(timeId)); 1260 1261 doSend(buff); 1262 1263 } 1264 1265 /* 1266 * (non-Javadoc) 1267 * 1268 * @see org.graphstream.stream.ElementSink#stepBegins(java.lang.String, 1269 * long, double) 1270 */ 1271 public void stepBegins(String sourceId, long timeId, double step) { 1272 1273 if (!sourceId.equals(this.sourceId)) { 1274 this.sourceId = sourceId; 1275 sourceIdBuff = encodeString(sourceId); 1276 } 1277 1278 ByteBuffer buff = ByteBuffer.allocate( 1279 streamBuffer.capacity() + // stream 1280 1 + // CMD 1281 sourceIdBuff.capacity() + // source id 1282 varintSize(timeId) + 1283 8 // time 1284 ); 1285 1286 streamBuffer.rewind(); 1287 sourceIdBuff.rewind(); 1288 1289 buff 1290 .put(streamBuffer) 1291 .put((byte) NetStreamConstants.EVENT_STEP) 1292 .put(sourceIdBuff) 1293 .put(encodeUnsignedVarint(timeId)) 1294 .putDouble(step); 1295 1296 1297 doSend(buff); 1298 } 1299 1300 /** 1301 * Force the connection to close (properly) with the server 1302 * 1303 * @throws IOException 1304 */ 1305 public void close() throws IOException { 1306 socket.close(); 1307 } 1308 1309}