001/* 002 * Copyright 2006 - 2013 003 * Stefan Balev <stefan.balev@graphstream-project.org> 004 * Julien Baudry <julien.baudry@graphstream-project.org> 005 * Antoine Dutot <antoine.dutot@graphstream-project.org> 006 * Yoann Pigné <yoann.pigne@graphstream-project.org> 007 * Guilhelm Savin <guilhelm.savin@graphstream-project.org> 008 * 009 * This file is part of GraphStream <http://graphstream-project.org>. 010 * 011 * GraphStream is a library whose purpose is to handle static or dynamic 012 * graph, create them from scratch, file or any source and display them. 013 * 014 * This program is free software distributed under the terms of two licenses, the 015 * CeCILL-C license that fits European law, and the GNU Lesser General Public 016 * License. You can use, modify and/ or redistribute the software under the terms 017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following 018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by 019 * the Free Software Foundation, either version 3 of the License, or (at your 020 * option) any later version. 021 * 022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY 023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 024 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 025 * 026 * You should have received a copy of the GNU Lesser General Public License 027 * along with this program. If not, see <http://www.gnu.org/licenses/>. 028 * 029 * The fact that you are presently reading this means that you have had 030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms. 031 */ 032package org.graphstream.stream.thread; 033 034import java.util.LinkedList; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.locks.Condition; 037import java.util.concurrent.locks.ReentrantLock; 038 039import org.graphstream.graph.Graph; 040import org.graphstream.stream.ProxyPipe; 041import org.graphstream.stream.Replayable; 042import org.graphstream.stream.Replayable.Controller; 043import org.graphstream.stream.Sink; 044import org.graphstream.stream.Source; 045import org.graphstream.stream.SourceBase; 046 047/** 048 * Filter that allows to pass graph events between two threads without explicit 049 * synchronization. 050 * 051 * <p> 052 * This filter allows to register it as an output for some source of events in a 053 * source thread (hereafter called the input thread) and to register listening 054 * outputs in a destination thread (hereafter called the sink thread). 055 * </p> 056 * 057 * <pre> 058 * | 059 * Source ---> ThreadProxyFilter ----> Sink 060 * Thread 1 | Thread 2 061 * | 062 * </pre> 063 * 064 * <p> 065 * In other words, this class allows to listen in a sink thread graph events 066 * that are produced in another source thread without any explicit 067 * synchronization on the source of events. 068 * </p> 069 * 070 * <p> 071 * The only restriction is that the sink thread must regularly call the 072 * {@link #pump()} method to dispatch events coming from the source to all sinks 073 * registered (see the explanation in {@link org.graphstream.stream.ProxyPipe}). 074 * </p> 075 * 076 * <p> 077 * You can register any kind of input as source of event, but if the input is a 078 * graph, then you can choose to "replay" all the content of the graph so that 079 * at the other end of the filter, all outputs receive the complete content of 080 * the graph. This is the default behavior if this filter is constructed with a 081 * graph as input. 082 * </p> 083 */ 084public class ThreadProxyPipe extends SourceBase implements ProxyPipe { 085 086 /** 087 * Proxy id. 088 */ 089 protected String id; 090 091 /** 092 * The event sender name, usually the graph name. 093 */ 094 protected String from; 095 096 /** 097 * The message box used to exchange messages between the two threads. 098 */ 099 protected LinkedList<GraphEvents> events; 100 protected LinkedList<Object[]> eventsData; 101 102 protected ReentrantLock lock; 103 protected Condition notEmpty; 104 105 /** 106 * Used only to remove the listener. We ensure this is done in the source 107 * thread. 108 */ 109 protected Source input; 110 111 /** 112 * Signals that this proxy must be removed from the source input. 113 */ 114 protected boolean unregisterWhenPossible = false; 115 116 public ThreadProxyPipe() { 117 this.events = new LinkedList<GraphEvents>(); 118 this.eventsData = new LinkedList<Object[]>(); 119 this.lock = new ReentrantLock(); 120 this.notEmpty = this.lock.newCondition(); 121 this.from = "<in>"; 122 this.input = null; 123 } 124 125 /** 126 * 127 * @param input 128 * The source of events we listen at. 129 * 130 * @deprecated Use the default constructor and then call the 131 * {@link #init(Source)} method. 132 */ 133 @Deprecated 134 public ThreadProxyPipe(Source input) { 135 this(input, null, input instanceof Replayable); 136 } 137 138 /** 139 * 140 * @param input 141 * @param replay 142 * 143 * @deprecated Use the default constructor and then call the 144 * {@link #init(Source)} method. 145 */ 146 @Deprecated 147 public ThreadProxyPipe(Source input, boolean replay) { 148 this(input, null, replay); 149 } 150 151 /** 152 * 153 * @param input 154 * @param initialListener 155 * @param replay 156 * 157 * @deprecated Use the default constructor and then call the 158 * {@link #init(Source)} method. 159 */ 160 @Deprecated 161 public ThreadProxyPipe(Source input, Sink initialListener, boolean replay) { 162 this(); 163 164 if (initialListener != null) 165 addSink(initialListener); 166 167 init(input, replay); 168 } 169 170 public void init() { 171 init(null, false); 172 } 173 174 /** 175 * Init the proxy. If there are previous events, they will be cleared. 176 * 177 * @param source 178 * source of the events 179 */ 180 public void init(Source source) { 181 init(source, source instanceof Replayable); 182 } 183 184 /** 185 * Init the proxy. If there are previous events, they will be cleared. 186 * 187 * @param source 188 * source of the events 189 * @param replay 190 * true if the source should be replayed. You need a 191 * {@link org.graphstream.stream.Replayable} source to enable 192 * replay, else nothing happens. 193 */ 194 public void init(Source source, boolean replay) { 195 lock.lock(); 196 197 try { 198 if (this.input != null) 199 this.input.removeSink(this); 200 201 this.input = source; 202 203 this.events.clear(); 204 this.eventsData.clear(); 205 } finally { 206 lock.unlock(); 207 } 208 209 if (source != null) { 210 if (source instanceof Graph) 211 this.from = ((Graph) source).getId(); 212 213 this.input.addSink(this); 214 215 if (replay && source instanceof Replayable) { 216 Replayable r = (Replayable) source; 217 Controller rc = r.getReplayController(); 218 219 rc.addSink(this); 220 rc.replay(); 221 } 222 } 223 } 224 225 @Override 226 public String toString() { 227 String dest = "nil"; 228 229 if (attrSinks.size() > 0) 230 dest = attrSinks.get(0).toString(); 231 232 return String.format("thread-proxy(from %s to %s)", from, dest); 233 } 234 235 /** 236 * Ask the proxy to unregister from the event input source (stop receive 237 * events) as soon as possible (when the next event will occur in the 238 * graph). 239 */ 240 public void unregisterFromSource() { 241 unregisterWhenPossible = true; 242 } 243 244 /** 245 * This method must be called regularly in the output thread to check if the 246 * input source sent events. If some event occurred, the listeners will be 247 * called. 248 */ 249 public void pump() { 250 GraphEvents e = null; 251 Object[] data = null; 252 253 do { 254 lock.lock(); 255 256 try { 257 e = events.poll(); 258 data = eventsData.poll(); 259 } finally { 260 lock.unlock(); 261 } 262 263 if (e != null) 264 processMessage(e, data); 265 } while (e != null); 266 } 267 268 /* 269 * (non-Javadoc) 270 * 271 * @see org.graphstream.stream.ProxyPipe#blockingPump() 272 */ 273 public void blockingPump() throws InterruptedException { 274 blockingPump(0); 275 } 276 277 public void blockingPump(long timeout) throws InterruptedException { 278 GraphEvents e; 279 Object[] data; 280 281 lock.lock(); 282 283 try { 284 if (timeout > 0) 285 while (events.size() == 0) 286 notEmpty.await(timeout, TimeUnit.MILLISECONDS); 287 else 288 while (events.size() == 0) 289 notEmpty.await(); 290 } finally { 291 lock.unlock(); 292 } 293 294 do { 295 lock.lock(); 296 297 try { 298 e = events.poll(); 299 data = eventsData.poll(); 300 } finally { 301 lock.unlock(); 302 } 303 304 if (e != null) 305 processMessage(e, data); 306 } while (e != null); 307 } 308 309 public boolean hasPostRemaining() { 310 boolean r = true; 311 lock.lock(); 312 313 try { 314 r = events.size() > 0; 315 } finally { 316 lock.unlock(); 317 } 318 319 return r; 320 } 321 322 /** 323 * Set of events sent via the message box. 324 */ 325 protected static enum GraphEvents { 326 ADD_NODE, DEL_NODE, ADD_EDGE, DEL_EDGE, STEP, CLEARED, ADD_GRAPH_ATTR, CHG_GRAPH_ATTR, DEL_GRAPH_ATTR, ADD_NODE_ATTR, CHG_NODE_ATTR, DEL_NODE_ATTR, ADD_EDGE_ATTR, CHG_EDGE_ATTR, DEL_EDGE_ATTR 327 }; 328 329 protected boolean maybeUnregister() { 330 if (unregisterWhenPossible) { 331 if (input != null) 332 input.removeSink(this); 333 return true; 334 } 335 336 return false; 337 } 338 339 protected void post(GraphEvents e, Object... data) { 340 lock.lock(); 341 342 try { 343 events.add(e); 344 eventsData.add(data); 345 346 notEmpty.signal(); 347 } finally { 348 lock.unlock(); 349 } 350 } 351 352 public void edgeAttributeAdded(String graphId, long timeId, String edgeId, 353 String attribute, Object value) { 354 if (maybeUnregister()) 355 return; 356 357 post(GraphEvents.ADD_EDGE_ATTR, graphId, timeId, edgeId, attribute, 358 value); 359 } 360 361 public void edgeAttributeChanged(String graphId, long timeId, 362 String edgeId, String attribute, Object oldValue, Object newValue) { 363 if (maybeUnregister()) 364 return; 365 366 post(GraphEvents.CHG_EDGE_ATTR, graphId, timeId, edgeId, attribute, 367 oldValue, newValue); 368 } 369 370 public void edgeAttributeRemoved(String graphId, long timeId, 371 String edgeId, String attribute) { 372 if (maybeUnregister()) 373 return; 374 375 post(GraphEvents.DEL_EDGE_ATTR, graphId, timeId, edgeId, attribute); 376 } 377 378 public void graphAttributeAdded(String graphId, long timeId, 379 String attribute, Object value) { 380 if (maybeUnregister()) 381 return; 382 383 post(GraphEvents.ADD_GRAPH_ATTR, graphId, timeId, attribute, value); 384 } 385 386 public void graphAttributeChanged(String graphId, long timeId, 387 String attribute, Object oldValue, Object newValue) { 388 if (maybeUnregister()) 389 return; 390 391 post(GraphEvents.CHG_GRAPH_ATTR, graphId, timeId, attribute, oldValue, 392 newValue); 393 } 394 395 public void graphAttributeRemoved(String graphId, long timeId, 396 String attribute) { 397 if (maybeUnregister()) 398 return; 399 400 post(GraphEvents.DEL_GRAPH_ATTR, graphId, timeId, attribute); 401 } 402 403 public void nodeAttributeAdded(String graphId, long timeId, String nodeId, 404 String attribute, Object value) { 405 if (maybeUnregister()) 406 return; 407 408 post(GraphEvents.ADD_NODE_ATTR, graphId, timeId, nodeId, attribute, 409 value); 410 } 411 412 public void nodeAttributeChanged(String graphId, long timeId, 413 String nodeId, String attribute, Object oldValue, Object newValue) { 414 if (maybeUnregister()) 415 return; 416 417 post(GraphEvents.CHG_NODE_ATTR, graphId, timeId, nodeId, attribute, 418 oldValue, newValue); 419 } 420 421 public void nodeAttributeRemoved(String graphId, long timeId, 422 String nodeId, String attribute) { 423 if (maybeUnregister()) 424 return; 425 426 post(GraphEvents.DEL_NODE_ATTR, graphId, timeId, nodeId, attribute); 427 } 428 429 public void edgeAdded(String graphId, long timeId, String edgeId, 430 String fromNodeId, String toNodeId, boolean directed) { 431 if (maybeUnregister()) 432 return; 433 434 post(GraphEvents.ADD_EDGE, graphId, timeId, edgeId, fromNodeId, 435 toNodeId, directed); 436 } 437 438 public void edgeRemoved(String graphId, long timeId, String edgeId) { 439 if (maybeUnregister()) 440 return; 441 442 post(GraphEvents.DEL_EDGE, graphId, timeId, edgeId); 443 } 444 445 public void graphCleared(String graphId, long timeId) { 446 if (maybeUnregister()) 447 return; 448 449 post(GraphEvents.CLEARED, graphId, timeId); 450 } 451 452 public void nodeAdded(String graphId, long timeId, String nodeId) { 453 if (maybeUnregister()) 454 return; 455 456 post(GraphEvents.ADD_NODE, graphId, timeId, nodeId); 457 } 458 459 public void nodeRemoved(String graphId, long timeId, String nodeId) { 460 if (maybeUnregister()) 461 return; 462 463 post(GraphEvents.DEL_NODE, graphId, timeId, nodeId); 464 } 465 466 public void stepBegins(String graphId, long timeId, double step) { 467 if (maybeUnregister()) 468 return; 469 470 post(GraphEvents.STEP, graphId, timeId, step); 471 } 472 473 // MBoxListener 474 475 public void processMessage(GraphEvents e, Object[] data) { 476 String graphId, elementId, attribute; 477 Long timeId; 478 Object newValue, oldValue; 479 480 switch (e) { 481 case ADD_NODE: 482 graphId = (String) data[0]; 483 timeId = (Long) data[1]; 484 elementId = (String) data[2]; 485 486 sendNodeAdded(graphId, timeId, elementId); 487 break; 488 case DEL_NODE: 489 graphId = (String) data[0]; 490 timeId = (Long) data[1]; 491 elementId = (String) data[2]; 492 493 sendNodeRemoved(graphId, timeId, elementId); 494 break; 495 case ADD_EDGE: 496 graphId = (String) data[0]; 497 timeId = (Long) data[1]; 498 elementId = (String) data[2]; 499 500 String fromId = (String) data[3]; 501 String toId = (String) data[4]; 502 boolean directed = (Boolean) data[5]; 503 504 sendEdgeAdded(graphId, timeId, elementId, fromId, toId, directed); 505 break; 506 case DEL_EDGE: 507 graphId = (String) data[0]; 508 timeId = (Long) data[1]; 509 elementId = (String) data[2]; 510 511 sendEdgeRemoved(graphId, timeId, elementId); 512 break; 513 case STEP: 514 graphId = (String) data[0]; 515 timeId = (Long) data[1]; 516 517 double step = (Double) data[2]; 518 519 sendStepBegins(graphId, timeId, step); 520 break; 521 case ADD_GRAPH_ATTR: 522 graphId = (String) data[0]; 523 timeId = (Long) data[1]; 524 attribute = (String) data[2]; 525 newValue = data[3]; 526 527 sendGraphAttributeAdded(graphId, timeId, attribute, newValue); 528 break; 529 case CHG_GRAPH_ATTR: 530 graphId = (String) data[0]; 531 timeId = (Long) data[1]; 532 attribute = (String) data[2]; 533 oldValue = data[3]; 534 newValue = data[4]; 535 536 sendGraphAttributeChanged(graphId, timeId, attribute, oldValue, 537 newValue); 538 break; 539 case DEL_GRAPH_ATTR: 540 graphId = (String) data[0]; 541 timeId = (Long) data[1]; 542 attribute = (String) data[2]; 543 544 sendGraphAttributeRemoved(graphId, timeId, attribute); 545 break; 546 case ADD_EDGE_ATTR: 547 graphId = (String) data[0]; 548 timeId = (Long) data[1]; 549 elementId = (String) data[2]; 550 attribute = (String) data[3]; 551 newValue = data[4]; 552 553 sendEdgeAttributeAdded(graphId, timeId, elementId, attribute, 554 newValue); 555 break; 556 case CHG_EDGE_ATTR: 557 graphId = (String) data[0]; 558 timeId = (Long) data[1]; 559 elementId = (String) data[2]; 560 attribute = (String) data[3]; 561 oldValue = data[4]; 562 newValue = data[5]; 563 564 sendEdgeAttributeChanged(graphId, timeId, elementId, attribute, 565 oldValue, newValue); 566 break; 567 case DEL_EDGE_ATTR: 568 graphId = (String) data[0]; 569 timeId = (Long) data[1]; 570 elementId = (String) data[2]; 571 attribute = (String) data[3]; 572 573 sendEdgeAttributeRemoved(graphId, timeId, elementId, attribute); 574 break; 575 case ADD_NODE_ATTR: 576 graphId = (String) data[0]; 577 timeId = (Long) data[1]; 578 elementId = (String) data[2]; 579 attribute = (String) data[3]; 580 newValue = data[4]; 581 582 sendNodeAttributeAdded(graphId, timeId, elementId, attribute, 583 newValue); 584 break; 585 case CHG_NODE_ATTR: 586 graphId = (String) data[0]; 587 timeId = (Long) data[1]; 588 elementId = (String) data[2]; 589 attribute = (String) data[3]; 590 oldValue = data[4]; 591 newValue = data[5]; 592 593 sendNodeAttributeChanged(graphId, timeId, elementId, attribute, 594 oldValue, newValue); 595 break; 596 case DEL_NODE_ATTR: 597 graphId = (String) data[0]; 598 timeId = (Long) data[1]; 599 elementId = (String) data[2]; 600 attribute = (String) data[3]; 601 602 sendNodeAttributeRemoved(graphId, timeId, elementId, attribute); 603 break; 604 case CLEARED: 605 graphId = (String) data[0]; 606 timeId = (Long) data[1]; 607 608 sendGraphCleared(graphId, timeId); 609 break; 610 default: 611 System.err.printf("ThreadProxy : Unknown message %s !!%n", e); 612 break; 613 } 614 } 615}