001/*
002 * Copyright 2006 - 2013
003 *     Stefan Balev     <stefan.balev@graphstream-project.org>
004 *     Julien Baudry    <julien.baudry@graphstream-project.org>
005 *     Antoine Dutot    <antoine.dutot@graphstream-project.org>
006 *     Yoann Pigné      <yoann.pigne@graphstream-project.org>
007 *     Guilhelm Savin   <guilhelm.savin@graphstream-project.org>
008 * 
009 * This file is part of GraphStream <http://graphstream-project.org>.
010 * 
011 * GraphStream is a library whose purpose is to handle static or dynamic
012 * graph, create them from scratch, file or any source and display them.
013 * 
014 * This program is free software distributed under the terms of two licenses, the
015 * CeCILL-C license that fits European law, and the GNU Lesser General Public
016 * License. You can  use, modify and/ or redistribute the software under the terms
017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following
018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by
019 * the Free Software Foundation, either version 3 of the License, or (at your
020 * option) any later version.
021 * 
022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY
023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
024 * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
025 * 
026 * You should have received a copy of the GNU Lesser General Public License
027 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
028 * 
029 * The fact that you are presently reading this means that you have had
030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
031 */
032package org.graphstream.stream.thread;
033
034import java.util.LinkedList;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.locks.Condition;
037import java.util.concurrent.locks.ReentrantLock;
038
039import org.graphstream.graph.Graph;
040import org.graphstream.stream.ProxyPipe;
041import org.graphstream.stream.Replayable;
042import org.graphstream.stream.Replayable.Controller;
043import org.graphstream.stream.Sink;
044import org.graphstream.stream.Source;
045import org.graphstream.stream.SourceBase;
046
047/**
048 * Filter that allows to pass graph events between two threads without explicit
049 * synchronization.
050 * 
051 * <p>
052 * This filter allows to register it as an output for some source of events in a
053 * source thread (hereafter called the input thread) and to register listening
054 * outputs in a destination thread (hereafter called the sink thread).
055 * </p>
056 * 
057 * <pre>
058 *                       |
059 *   Source ---> ThreadProxyFilter ----> Sink
060 *  Thread 1             |              Thread 2
061 *                       |
062 * </pre>
063 * 
064 * <p>
065 * In other words, this class allows to listen in a sink thread graph events
066 * that are produced in another source thread without any explicit
067 * synchronization on the source of events.
068 * </p>
069 * 
070 * <p>
071 * The only restriction is that the sink thread must regularly call the
072 * {@link #pump()} method to dispatch events coming from the source to all sinks
073 * registered (see the explanation in {@link org.graphstream.stream.ProxyPipe}).
074 * </p>
075 * 
076 * <p>
077 * You can register any kind of input as source of event, but if the input is a
078 * graph, then you can choose to "replay" all the content of the graph so that
079 * at the other end of the filter, all outputs receive the complete content of
080 * the graph. This is the default behavior if this filter is constructed with a
081 * graph as input.
082 * </p>
083 */
084public class ThreadProxyPipe extends SourceBase implements ProxyPipe {
085
086        /**
087         * Proxy id.
088         */
089        protected String id;
090
091        /**
092         * The event sender name, usually the graph name.
093         */
094        protected String from;
095
096        /**
097         * The message box used to exchange messages between the two threads.
098         */
099        protected LinkedList<GraphEvents> events;
100        protected LinkedList<Object[]> eventsData;
101
102        protected ReentrantLock lock;
103        protected Condition notEmpty;
104
105        /**
106         * Used only to remove the listener. We ensure this is done in the source
107         * thread.
108         */
109        protected Source input;
110
111        /**
112         * Signals that this proxy must be removed from the source input.
113         */
114        protected boolean unregisterWhenPossible = false;
115
116        public ThreadProxyPipe() {
117                this.events = new LinkedList<GraphEvents>();
118                this.eventsData = new LinkedList<Object[]>();
119                this.lock = new ReentrantLock();
120                this.notEmpty = this.lock.newCondition();
121                this.from = "<in>";
122                this.input = null;
123        }
124
125        /**
126         * 
127         * @param input
128         *            The source of events we listen at.
129         * 
130         * @deprecated Use the default constructor and then call the
131         *             {@link #init(Source)} method.
132         */
133        @Deprecated
134        public ThreadProxyPipe(Source input) {
135                this(input, null, input instanceof Replayable);
136        }
137
138        /**
139         * 
140         * @param input
141         * @param replay
142         * 
143         * @deprecated Use the default constructor and then call the
144         *             {@link #init(Source)} method.
145         */
146        @Deprecated
147        public ThreadProxyPipe(Source input, boolean replay) {
148                this(input, null, replay);
149        }
150
151        /**
152         * 
153         * @param input
154         * @param initialListener
155         * @param replay
156         * 
157         * @deprecated Use the default constructor and then call the
158         *             {@link #init(Source)} method.
159         */
160        @Deprecated
161        public ThreadProxyPipe(Source input, Sink initialListener, boolean replay) {
162                this();
163
164                if (initialListener != null)
165                        addSink(initialListener);
166
167                init(input, replay);
168        }
169
170        public void init() {
171                init(null, false);
172        }
173
174        /**
175         * Init the proxy. If there are previous events, they will be cleared.
176         * 
177         * @param source
178         *            source of the events
179         */
180        public void init(Source source) {
181                init(source, source instanceof Replayable);
182        }
183
184        /**
185         * Init the proxy. If there are previous events, they will be cleared.
186         * 
187         * @param source
188         *            source of the events
189         * @param replay
190         *            true if the source should be replayed. You need a
191         *            {@link org.graphstream.stream.Replayable} source to enable
192         *            replay, else nothing happens.
193         */
194        public void init(Source source, boolean replay) {
195                lock.lock();
196
197                try {
198                        if (this.input != null)
199                                this.input.removeSink(this);
200
201                        this.input = source;
202
203                        this.events.clear();
204                        this.eventsData.clear();
205                } finally {
206                        lock.unlock();
207                }
208
209                if (source != null) {
210                        if (source instanceof Graph)
211                                this.from = ((Graph) source).getId();
212
213                        this.input.addSink(this);
214
215                        if (replay && source instanceof Replayable) {
216                                Replayable r = (Replayable) source;
217                                Controller rc = r.getReplayController();
218
219                                rc.addSink(this);
220                                rc.replay();
221                        }
222                }
223        }
224
225        @Override
226        public String toString() {
227                String dest = "nil";
228
229                if (attrSinks.size() > 0)
230                        dest = attrSinks.get(0).toString();
231
232                return String.format("thread-proxy(from %s to %s)", from, dest);
233        }
234
235        /**
236         * Ask the proxy to unregister from the event input source (stop receive
237         * events) as soon as possible (when the next event will occur in the
238         * graph).
239         */
240        public void unregisterFromSource() {
241                unregisterWhenPossible = true;
242        }
243
244        /**
245         * This method must be called regularly in the output thread to check if the
246         * input source sent events. If some event occurred, the listeners will be
247         * called.
248         */
249        public void pump() {
250                GraphEvents e = null;
251                Object[] data = null;
252
253                do {
254                        lock.lock();
255
256                        try {
257                                e = events.poll();
258                                data = eventsData.poll();
259                        } finally {
260                                lock.unlock();
261                        }
262
263                        if (e != null)
264                                processMessage(e, data);
265                } while (e != null);
266        }
267
268        /*
269         * (non-Javadoc)
270         * 
271         * @see org.graphstream.stream.ProxyPipe#blockingPump()
272         */
273        public void blockingPump() throws InterruptedException {
274                blockingPump(0);
275        }
276
277        public void blockingPump(long timeout) throws InterruptedException {
278                GraphEvents e;
279                Object[] data;
280
281                lock.lock();
282
283                try {
284                        if (timeout > 0)
285                                while (events.size() == 0)
286                                        notEmpty.await(timeout, TimeUnit.MILLISECONDS);
287                        else
288                                while (events.size() == 0)
289                                        notEmpty.await();
290                } finally {
291                        lock.unlock();
292                }
293
294                do {
295                        lock.lock();
296
297                        try {
298                                e = events.poll();
299                                data = eventsData.poll();
300                        } finally {
301                                lock.unlock();
302                        }
303
304                        if (e != null)
305                                processMessage(e, data);
306                } while (e != null);
307        }
308
309        public boolean hasPostRemaining() {
310                boolean r = true;
311                lock.lock();
312
313                try {
314                        r = events.size() > 0;
315                } finally {
316                        lock.unlock();
317                }
318
319                return r;
320        }
321
322        /**
323         * Set of events sent via the message box.
324         */
325        protected static enum GraphEvents {
326                ADD_NODE, DEL_NODE, ADD_EDGE, DEL_EDGE, STEP, CLEARED, ADD_GRAPH_ATTR, CHG_GRAPH_ATTR, DEL_GRAPH_ATTR, ADD_NODE_ATTR, CHG_NODE_ATTR, DEL_NODE_ATTR, ADD_EDGE_ATTR, CHG_EDGE_ATTR, DEL_EDGE_ATTR
327        };
328
329        protected boolean maybeUnregister() {
330                if (unregisterWhenPossible) {
331                        if (input != null)
332                                input.removeSink(this);
333                        return true;
334                }
335
336                return false;
337        }
338
339        protected void post(GraphEvents e, Object... data) {
340                lock.lock();
341
342                try {
343                        events.add(e);
344                        eventsData.add(data);
345
346                        notEmpty.signal();
347                } finally {
348                        lock.unlock();
349                }
350        }
351
352        public void edgeAttributeAdded(String graphId, long timeId, String edgeId,
353                        String attribute, Object value) {
354                if (maybeUnregister())
355                        return;
356
357                post(GraphEvents.ADD_EDGE_ATTR, graphId, timeId, edgeId, attribute,
358                                value);
359        }
360
361        public void edgeAttributeChanged(String graphId, long timeId,
362                        String edgeId, String attribute, Object oldValue, Object newValue) {
363                if (maybeUnregister())
364                        return;
365
366                post(GraphEvents.CHG_EDGE_ATTR, graphId, timeId, edgeId, attribute,
367                                oldValue, newValue);
368        }
369
370        public void edgeAttributeRemoved(String graphId, long timeId,
371                        String edgeId, String attribute) {
372                if (maybeUnregister())
373                        return;
374
375                post(GraphEvents.DEL_EDGE_ATTR, graphId, timeId, edgeId, attribute);
376        }
377
378        public void graphAttributeAdded(String graphId, long timeId,
379                        String attribute, Object value) {
380                if (maybeUnregister())
381                        return;
382
383                post(GraphEvents.ADD_GRAPH_ATTR, graphId, timeId, attribute, value);
384        }
385
386        public void graphAttributeChanged(String graphId, long timeId,
387                        String attribute, Object oldValue, Object newValue) {
388                if (maybeUnregister())
389                        return;
390
391                post(GraphEvents.CHG_GRAPH_ATTR, graphId, timeId, attribute, oldValue,
392                                newValue);
393        }
394
395        public void graphAttributeRemoved(String graphId, long timeId,
396                        String attribute) {
397                if (maybeUnregister())
398                        return;
399
400                post(GraphEvents.DEL_GRAPH_ATTR, graphId, timeId, attribute);
401        }
402
403        public void nodeAttributeAdded(String graphId, long timeId, String nodeId,
404                        String attribute, Object value) {
405                if (maybeUnregister())
406                        return;
407
408                post(GraphEvents.ADD_NODE_ATTR, graphId, timeId, nodeId, attribute,
409                                value);
410        }
411
412        public void nodeAttributeChanged(String graphId, long timeId,
413                        String nodeId, String attribute, Object oldValue, Object newValue) {
414                if (maybeUnregister())
415                        return;
416
417                post(GraphEvents.CHG_NODE_ATTR, graphId, timeId, nodeId, attribute,
418                                oldValue, newValue);
419        }
420
421        public void nodeAttributeRemoved(String graphId, long timeId,
422                        String nodeId, String attribute) {
423                if (maybeUnregister())
424                        return;
425
426                post(GraphEvents.DEL_NODE_ATTR, graphId, timeId, nodeId, attribute);
427        }
428
429        public void edgeAdded(String graphId, long timeId, String edgeId,
430                        String fromNodeId, String toNodeId, boolean directed) {
431                if (maybeUnregister())
432                        return;
433
434                post(GraphEvents.ADD_EDGE, graphId, timeId, edgeId, fromNodeId,
435                                toNodeId, directed);
436        }
437
438        public void edgeRemoved(String graphId, long timeId, String edgeId) {
439                if (maybeUnregister())
440                        return;
441
442                post(GraphEvents.DEL_EDGE, graphId, timeId, edgeId);
443        }
444
445        public void graphCleared(String graphId, long timeId) {
446                if (maybeUnregister())
447                        return;
448
449                post(GraphEvents.CLEARED, graphId, timeId);
450        }
451
452        public void nodeAdded(String graphId, long timeId, String nodeId) {
453                if (maybeUnregister())
454                        return;
455
456                post(GraphEvents.ADD_NODE, graphId, timeId, nodeId);
457        }
458
459        public void nodeRemoved(String graphId, long timeId, String nodeId) {
460                if (maybeUnregister())
461                        return;
462
463                post(GraphEvents.DEL_NODE, graphId, timeId, nodeId);
464        }
465
466        public void stepBegins(String graphId, long timeId, double step) {
467                if (maybeUnregister())
468                        return;
469
470                post(GraphEvents.STEP, graphId, timeId, step);
471        }
472
473        // MBoxListener
474
475        public void processMessage(GraphEvents e, Object[] data) {
476                String graphId, elementId, attribute;
477                Long timeId;
478                Object newValue, oldValue;
479
480                switch (e) {
481                case ADD_NODE:
482                        graphId = (String) data[0];
483                        timeId = (Long) data[1];
484                        elementId = (String) data[2];
485
486                        sendNodeAdded(graphId, timeId, elementId);
487                        break;
488                case DEL_NODE:
489                        graphId = (String) data[0];
490                        timeId = (Long) data[1];
491                        elementId = (String) data[2];
492
493                        sendNodeRemoved(graphId, timeId, elementId);
494                        break;
495                case ADD_EDGE:
496                        graphId = (String) data[0];
497                        timeId = (Long) data[1];
498                        elementId = (String) data[2];
499
500                        String fromId = (String) data[3];
501                        String toId = (String) data[4];
502                        boolean directed = (Boolean) data[5];
503
504                        sendEdgeAdded(graphId, timeId, elementId, fromId, toId, directed);
505                        break;
506                case DEL_EDGE:
507                        graphId = (String) data[0];
508                        timeId = (Long) data[1];
509                        elementId = (String) data[2];
510
511                        sendEdgeRemoved(graphId, timeId, elementId);
512                        break;
513                case STEP:
514                        graphId = (String) data[0];
515                        timeId = (Long) data[1];
516
517                        double step = (Double) data[2];
518
519                        sendStepBegins(graphId, timeId, step);
520                        break;
521                case ADD_GRAPH_ATTR:
522                        graphId = (String) data[0];
523                        timeId = (Long) data[1];
524                        attribute = (String) data[2];
525                        newValue = data[3];
526
527                        sendGraphAttributeAdded(graphId, timeId, attribute, newValue);
528                        break;
529                case CHG_GRAPH_ATTR:
530                        graphId = (String) data[0];
531                        timeId = (Long) data[1];
532                        attribute = (String) data[2];
533                        oldValue = data[3];
534                        newValue = data[4];
535
536                        sendGraphAttributeChanged(graphId, timeId, attribute, oldValue,
537                                        newValue);
538                        break;
539                case DEL_GRAPH_ATTR:
540                        graphId = (String) data[0];
541                        timeId = (Long) data[1];
542                        attribute = (String) data[2];
543
544                        sendGraphAttributeRemoved(graphId, timeId, attribute);
545                        break;
546                case ADD_EDGE_ATTR:
547                        graphId = (String) data[0];
548                        timeId = (Long) data[1];
549                        elementId = (String) data[2];
550                        attribute = (String) data[3];
551                        newValue = data[4];
552
553                        sendEdgeAttributeAdded(graphId, timeId, elementId, attribute,
554                                        newValue);
555                        break;
556                case CHG_EDGE_ATTR:
557                        graphId = (String) data[0];
558                        timeId = (Long) data[1];
559                        elementId = (String) data[2];
560                        attribute = (String) data[3];
561                        oldValue = data[4];
562                        newValue = data[5];
563
564                        sendEdgeAttributeChanged(graphId, timeId, elementId, attribute,
565                                        oldValue, newValue);
566                        break;
567                case DEL_EDGE_ATTR:
568                        graphId = (String) data[0];
569                        timeId = (Long) data[1];
570                        elementId = (String) data[2];
571                        attribute = (String) data[3];
572
573                        sendEdgeAttributeRemoved(graphId, timeId, elementId, attribute);
574                        break;
575                case ADD_NODE_ATTR:
576                        graphId = (String) data[0];
577                        timeId = (Long) data[1];
578                        elementId = (String) data[2];
579                        attribute = (String) data[3];
580                        newValue = data[4];
581
582                        sendNodeAttributeAdded(graphId, timeId, elementId, attribute,
583                                        newValue);
584                        break;
585                case CHG_NODE_ATTR:
586                        graphId = (String) data[0];
587                        timeId = (Long) data[1];
588                        elementId = (String) data[2];
589                        attribute = (String) data[3];
590                        oldValue = data[4];
591                        newValue = data[5];
592
593                        sendNodeAttributeChanged(graphId, timeId, elementId, attribute,
594                                        oldValue, newValue);
595                        break;
596                case DEL_NODE_ATTR:
597                        graphId = (String) data[0];
598                        timeId = (Long) data[1];
599                        elementId = (String) data[2];
600                        attribute = (String) data[3];
601
602                        sendNodeAttributeRemoved(graphId, timeId, elementId, attribute);
603                        break;
604                case CLEARED:
605                        graphId = (String) data[0];
606                        timeId = (Long) data[1];
607
608                        sendGraphCleared(graphId, timeId);
609                        break;
610                default:
611                        System.err.printf("ThreadProxy : Unknown message %s !!%n", e);
612                        break;
613                }
614        }
615}