001/*
002 * Copyright 2006 - 2013
003 *     Stefan Balev     <stefan.balev@graphstream-project.org>
004 *     Julien Baudry    <julien.baudry@graphstream-project.org>
005 *     Antoine Dutot    <antoine.dutot@graphstream-project.org>
006 *     Yoann Pigné      <yoann.pigne@graphstream-project.org>
007 *     Guilhelm Savin   <guilhelm.savin@graphstream-project.org>
008 * 
009 * This file is part of GraphStream <http://graphstream-project.org>.
010 * 
011 * GraphStream is a library whose purpose is to handle static or dynamic
012 * graph, create them from scratch, file or any source and display them.
013 * 
014 * This program is free software distributed under the terms of two licenses, the
015 * CeCILL-C license that fits European law, and the GNU Lesser General Public
016 * License. You can  use, modify and/ or redistribute the software under the terms
017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following
018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by
019 * the Free Software Foundation, either version 3 of the License, or (at your
020 * option) any later version.
021 * 
022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY
023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
024 * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
025 * 
026 * You should have received a copy of the GNU Lesser General Public License
027 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
028 * 
029 * The fact that you are presently reading this means that you have had
030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
031 */
032package org.graphstream.stream.thread;
033
034import org.graphstream.graph.Edge;
035import org.graphstream.graph.Graph;
036import org.graphstream.graph.Node;
037import org.graphstream.stream.ProxyPipe;
038import org.graphstream.stream.Sink;
039import org.graphstream.stream.Source;
040import org.graphstream.stream.SourceBase;
041import org.miv.mbox.CannotPostException;
042import org.miv.mbox.MBox;
043import org.miv.mbox.MBoxListener;
044import org.miv.mbox.MBoxStandalone;
045
046/**
047 * Filter that allows to pass graph events between two threads without explicit
048 * synchronization.
049 * 
050 * <p>
051 * This filter allows to register it as an output for some source of events in a
052 * source thread (hereafter called the input thread) and to register listening
053 * outputs in a destination thread (hereafter called the sink thread).
054 * </p>
055 * 
056 * <pre>
057 *                       |
058 *   Source ---> ThreadProxyFilter ----> Sink
059 *  Thread 1             |              Thread 2
060 *                       |
061 * </pre>
062 * 
063 * <p>
064 * In other words, this class allows to listen in a sink thread graph events
065 * that are produced in another source thread without any explicit
066 * synchronization on the source of events.
067 * </p>
068 * 
069 * <p>
070 * The only restriction is that the sink thread must regularly call the
071 * {@link #pump()} method to dispatch events coming from the source to all sinks
072 * registered (see the explanation in {@link org.graphstream.stream.ProxyPipe}).
073 * </p>
074 * 
075 * <p>
076 * You can register any kind of input as source of event, but if the input is a
077 * graph, then you can choose to "replay" all the content of the graph so that
078 * at the other end of the filter, all outputs receive the complete content of
079 * the graph. This is the default behavior if this filter is constructed with a
080 * graph as input.
081 * </p>
082 * 
083 * @deprecated This is the old version of
084 *             {@link org.graphstream.stream.thread.ThreadProxyPipe}.
085 */
086@Deprecated
087public class ThreadProxyPipeOld extends SourceBase implements ProxyPipe,
088                MBoxListener {
089
090        /**
091         * Proxy id.
092         */
093        protected String id;
094
095        /**
096         * The event sender name, usually the graph name.
097         */
098        protected String from;
099
100        /**
101         * The message box used to exchange messages between the two threads.
102         */
103        protected MBox events;
104
105        /**
106         * Used only to remove the listener. We ensure this is done in the source
107         * thread.
108         */
109        protected Source input;
110
111        /**
112         * Signals that this proxy must be removed from the source input.
113         */
114        protected boolean unregisterWhenPossible = false;
115
116        /**
117         * New thread proxy pipe with no input.
118         */
119        public ThreadProxyPipeOld() {
120                this((Source) null);
121        }
122
123        /**
124         * Listen at an input in a given thread and redirect all events to
125         * GraphListeners that may be in another thread.
126         * 
127         * @param input
128         *            The source of graph events we listen at.
129         */
130        public ThreadProxyPipeOld(Source input) {
131                this(input, new MBoxStandalone());
132        }
133
134        /**
135         * Like {@link #ThreadProxyPipe(Source)}, but allow to share the message box
136         * with another message processor. This can be needed to share the same
137         * message stack, when message order is important.
138         * 
139         * @param input
140         *            The source of events we listen at.
141         * @param sharedMBox
142         *            The message box used to send and receive graph messages across
143         *            the thread boundary.
144         */
145        public ThreadProxyPipeOld(Source input, MBox sharedMBox) {
146                this.events = sharedMBox;
147                this.from = "<in>";
148                this.input = input;
149
150                if (input != null)
151                        input.addSink(this);
152
153                ((MBoxStandalone) this.events).addListener(this);
154        }
155
156        /**
157         * Listen at an input graph in a given thread and redirect all events to
158         * GraphListeners that may be in another thread. By default, if the graph
159         * already contains some elements, they are "replayed". This means that
160         * events are sent to mimic the fact they just appeared.
161         * 
162         * @param inputGraph
163         *            The graph we listen at.
164         */
165        public ThreadProxyPipeOld(Graph inputGraph) {
166                this(inputGraph, true);
167        }
168
169        /**
170         * Like {@link #ThreadProxyPipe(Graph)} but allow to avoid replaying the
171         * graph.
172         * 
173         * @param inputGraph
174         *            The graph we listen at.
175         * @param replayGraph
176         *            If false, and if the input graph already contains element they
177         *            are not replayed.
178         */
179        public ThreadProxyPipeOld(Graph inputGraph, boolean replayGraph) {
180                this(inputGraph, null, replayGraph);
181        }
182
183        /**
184         * Like {@link #ThreadProxyPipe(Graph,boolean)} but allows to pass an
185         * initial listener, therefore specifying the input and output at once.
186         * 
187         * @param inputGraph
188         *            The graph we listen at.
189         * @param firstListener
190         *            The initial listener to register.
191         * @param replayGraph
192         *            If false, and if the input graph already contains element they
193         *            are not replayed.
194         */
195        public ThreadProxyPipeOld(Graph inputGraph, Sink firstListener,
196                        boolean replayGraph) {
197                this(inputGraph, firstListener, replayGraph, new MBoxStandalone());
198        }
199
200        /**
201         * Like {@link #ThreadProxyPipe(Graph,Sink,boolean)}, but allows to share
202         * the message box with another message processor. This can be needed to
203         * share the same message stack, when message order is important.
204         * 
205         * @param inputGraph
206         *            The graph we listen at.
207         * @param replayGraph
208         *            If false, and if the input graph already contains element they
209         *            are not replayed.
210         * @param sharedMBox
211         *            The message box used to send and receive graph messages across
212         *            the thread boundary.
213         */
214        public ThreadProxyPipeOld(Graph inputGraph, Sink firstListener,
215                        boolean replayGraph, MBox sharedMBox) {
216                this.events = sharedMBox;
217                this.from = inputGraph.getId();
218                this.input = inputGraph;
219
220                if (firstListener != null)
221                        addSink(firstListener);
222
223                if (replayGraph)
224                        replayGraph(inputGraph);
225
226                input.addSink(this);
227                ((MBoxStandalone) this.events).addListener(this);
228        }
229
230        @Override
231        public String toString() {
232                String dest = "nil";
233
234                if (attrSinks.size() > 0)
235                        dest = attrSinks.get(0).toString();
236
237                return String.format("thread-proxy(from %s to %s)", from, dest);
238        }
239
240        /**
241         * Ask the proxy to unregister from the event input source (stop receive
242         * events) as soon as possible (when the next event will occur in the
243         * graph).
244         */
245        public void unregisterFromSource() {
246                unregisterWhenPossible = true;
247        }
248
249        /**
250         * This method must be called regularly in the output thread to check if the
251         * input source sent events. If some event occurred, the listeners will be
252         * called.
253         */
254        public void pump() {
255                ((MBoxStandalone) events).processMessages();
256        }
257
258        /*
259         * (non-Javadoc)
260         * 
261         * @see org.graphstream.stream.ProxyPipe#blockingPump()
262         */
263        public void blockingPump() throws InterruptedException {
264                throw new UnsupportedOperationException();
265        }
266
267        /*
268         * (non-Javadoc)
269         * 
270         * @see org.graphstream.stream.ProxyPipe#blockingPump(long)
271         */
272        public void blockingPump(long timeout) throws InterruptedException {
273                throw new UnsupportedOperationException();
274        }
275
276        /**
277         * Set of events sent via the message box.
278         */
279        protected static enum GraphEvents {
280                ADD_NODE, DEL_NODE, ADD_EDGE, DEL_EDGE, STEP, CLEARED, ADD_GRAPH_ATTR, CHG_GRAPH_ATTR, DEL_GRAPH_ATTR, ADD_NODE_ATTR, CHG_NODE_ATTR, DEL_NODE_ATTR, ADD_EDGE_ATTR, CHG_EDGE_ATTR, DEL_EDGE_ATTR
281        };
282
283        protected void replayGraph(Graph graph) {
284                try {
285                        String graphId = "@replay";
286
287                        // Replay all graph attributes.
288
289                        if (graph.getAttributeKeySet() != null)
290                                for (String key : graph.getAttributeKeySet())
291                                        events.post(from, GraphEvents.ADD_GRAPH_ATTR, graphId,
292                                                        sourceTime.newEvent(), key, graph.getAttribute(key));
293
294                        Thread.yield();
295
296                        // Replay all nodes and their attributes.
297
298                        for (Node node : graph) {
299                                events.post(from, GraphEvents.ADD_NODE, graphId,
300                                                sourceTime.newEvent(), node.getId());
301
302                                if (node.getAttributeKeySet() != null)
303                                        for (String key : node.getAttributeKeySet())
304                                                events.post(from, GraphEvents.ADD_NODE_ATTR, graphId,
305                                                                sourceTime.newEvent(), node.getId(), key,
306                                                                node.getAttribute(key));
307                                Thread.yield();
308                        }
309
310                        // Replay all edges and their attributes.
311
312                        for (Edge edge : graph.getEachEdge()) {
313                                events.post(from, GraphEvents.ADD_EDGE, graphId, sourceTime
314                                                .newEvent(), edge.getId(),
315                                                edge.getSourceNode().getId(), edge.getTargetNode()
316                                                                .getId(), edge.isDirected());
317
318                                if (edge.getAttributeKeySet() != null)
319                                        for (String key : edge.getAttributeKeySet())
320                                                events.post(from, GraphEvents.ADD_EDGE_ATTR, graphId,
321                                                                sourceTime.newEvent(), edge.getId(), key,
322                                                                edge.getAttribute(key));
323                                Thread.yield();
324                        }
325                } catch (CannotPostException e) {
326                        System.err
327                                        .printf("GraphRendererRunner: cannot post message to listeners: %s%n",
328                                                        e.getMessage());
329                }
330        }
331
332        protected boolean maybeUnregister() {
333                if (unregisterWhenPossible) {
334                        if (input != null)
335                                input.removeSink(this);
336                        return true;
337                }
338
339                return false;
340        }
341
342        public void edgeAttributeAdded(String graphId, long timeId, String edgeId,
343                        String attribute, Object value) {
344                if (maybeUnregister())
345                        return;
346
347                try {
348                        events.post(from, GraphEvents.ADD_EDGE_ATTR, graphId, timeId,
349                                        edgeId, attribute, value);
350                } catch (CannotPostException e) {
351                        e.printStackTrace();
352                }
353        }
354
355        public void edgeAttributeChanged(String graphId, long timeId,
356                        String edgeId, String attribute, Object oldValue, Object newValue) {
357                if (maybeUnregister())
358                        return;
359
360                try {
361                        events.post(from, GraphEvents.CHG_EDGE_ATTR, graphId, timeId,
362                                        edgeId, attribute, oldValue, newValue);
363                } catch (CannotPostException e) {
364                        e.printStackTrace();
365                }
366        }
367
368        public void edgeAttributeRemoved(String graphId, long timeId,
369                        String edgeId, String attribute) {
370                if (maybeUnregister())
371                        return;
372
373                try {
374                        events.post(from, GraphEvents.DEL_EDGE_ATTR, graphId, timeId,
375                                        edgeId, attribute);
376                } catch (CannotPostException e) {
377                        e.printStackTrace();
378                }
379        }
380
381        public void graphAttributeAdded(String graphId, long timeId,
382                        String attribute, Object value) {
383                if (maybeUnregister())
384                        return;
385
386                try {
387                        events.post(from, GraphEvents.ADD_GRAPH_ATTR, graphId, timeId,
388                                        attribute, value);
389                } catch (CannotPostException e) {
390                        e.printStackTrace();
391                }
392        }
393
394        public void graphAttributeChanged(String graphId, long timeId,
395                        String attribute, Object oldValue, Object newValue) {
396                if (maybeUnregister())
397                        return;
398
399                try {
400                        events.post(from, GraphEvents.CHG_GRAPH_ATTR, graphId, timeId,
401                                        attribute, oldValue, newValue);
402                } catch (CannotPostException e) {
403                        e.printStackTrace();
404                }
405        }
406
407        public void graphAttributeRemoved(String graphId, long timeId,
408                        String attribute) {
409                if (maybeUnregister())
410                        return;
411
412                try {
413                        events.post(from, GraphEvents.DEL_GRAPH_ATTR, graphId, timeId,
414                                        attribute);
415                } catch (CannotPostException e) {
416                        e.printStackTrace();
417                }
418        }
419
420        public void nodeAttributeAdded(String graphId, long timeId, String nodeId,
421                        String attribute, Object value) {
422                if (maybeUnregister())
423                        return;
424
425                try {
426                        events.post(from, GraphEvents.ADD_NODE_ATTR, graphId, timeId,
427                                        nodeId, attribute, value);
428                } catch (CannotPostException e) {
429                        e.printStackTrace();
430                }
431        }
432
433        public void nodeAttributeChanged(String graphId, long timeId,
434                        String nodeId, String attribute, Object oldValue, Object newValue) {
435                if (maybeUnregister())
436                        return;
437
438                try {
439                        events.post(from, GraphEvents.CHG_NODE_ATTR, graphId, timeId,
440                                        nodeId, attribute, oldValue, newValue);
441                } catch (CannotPostException e) {
442                        e.printStackTrace();
443                }
444        }
445
446        public void nodeAttributeRemoved(String graphId, long timeId,
447                        String nodeId, String attribute) {
448                if (maybeUnregister())
449                        return;
450
451                try {
452                        events.post(from, GraphEvents.DEL_NODE_ATTR, graphId, timeId,
453                                        nodeId, attribute);
454                } catch (CannotPostException e) {
455                        e.printStackTrace();
456                }
457        }
458
459        public void edgeAdded(String graphId, long timeId, String edgeId,
460                        String fromNodeId, String toNodeId, boolean directed) {
461                if (maybeUnregister())
462                        return;
463
464                try {
465                        events.post(from, GraphEvents.ADD_EDGE, graphId, timeId, edgeId,
466                                        fromNodeId, toNodeId, directed);
467                } catch (CannotPostException e) {
468                        e.printStackTrace();
469                }
470        }
471
472        public void edgeRemoved(String graphId, long timeId, String edgeId) {
473                if (maybeUnregister())
474                        return;
475
476                try {
477                        events.post(from, GraphEvents.DEL_EDGE, graphId, timeId, edgeId);
478                } catch (CannotPostException e) {
479                        e.printStackTrace();
480                }
481        }
482
483        public void graphCleared(String graphId, long timeId) {
484                if (maybeUnregister())
485                        return;
486
487                try {
488                        events.post(from, GraphEvents.CLEARED, graphId, timeId);
489                } catch (CannotPostException e) {
490                        e.printStackTrace();
491                }
492        }
493
494        public void nodeAdded(String graphId, long timeId, String nodeId) {
495                if (maybeUnregister())
496                        return;
497
498                try {
499                        events.post(from, GraphEvents.ADD_NODE, graphId, timeId, nodeId);
500                } catch (CannotPostException e) {
501                        e.printStackTrace();
502                }
503        }
504
505        public void nodeRemoved(String graphId, long timeId, String nodeId) {
506                if (maybeUnregister())
507                        return;
508
509                try {
510                        events.post(from, GraphEvents.DEL_NODE, graphId, timeId, nodeId);
511                } catch (CannotPostException e) {
512                        e.printStackTrace();
513                }
514        }
515
516        public void stepBegins(String graphId, long timeId, double step) {
517                if (maybeUnregister())
518                        return;
519
520                try {
521                        events.post(from, GraphEvents.STEP, graphId, timeId, step);
522                } catch (CannotPostException e) {
523                        e.printStackTrace();
524                }
525        }
526
527        // MBoxListener
528
529        public void processMessage(String from, Object[] data) {
530                // System.err.printf( "    %s.msg(%s, %s, %s, %s)%n", from, data[1],
531                // data[2], data[0], data[3] );
532                if (data[0].equals(GraphEvents.ADD_NODE)) {
533                        String graphId = (String) data[1];
534                        Long timeId = (Long) data[2];
535                        String nodeId = (String) data[3];
536
537                        sendNodeAdded(graphId, timeId, nodeId);
538                } else if (data[0].equals(GraphEvents.DEL_NODE)) {
539                        String graphId = (String) data[1];
540                        Long timeId = (Long) data[2];
541                        String nodeId = (String) data[3];
542
543                        sendNodeRemoved(graphId, timeId, nodeId);
544                } else if (data[0].equals(GraphEvents.ADD_EDGE)) {
545                        String graphId = (String) data[1];
546                        Long timeId = (Long) data[2];
547                        String edgeId = (String) data[3];
548                        String fromId = (String) data[4];
549                        String toId = (String) data[5];
550                        boolean directed = (Boolean) data[6];
551
552                        sendEdgeAdded(graphId, timeId, edgeId, fromId, toId, directed);
553                } else if (data[0].equals(GraphEvents.DEL_EDGE)) {
554                        String graphId = (String) data[1];
555                        Long timeId = (Long) data[2];
556                        String edgeId = (String) data[3];
557
558                        sendEdgeRemoved(graphId, timeId, edgeId);
559                } else if (data[0].equals(GraphEvents.STEP)) {
560                        String graphId = (String) data[1];
561                        Long timeId = (Long) data[2];
562                        double step = (Double) data[3];
563
564                        sendStepBegins(graphId, timeId, step);
565                } else if (data[0].equals(GraphEvents.ADD_GRAPH_ATTR)) {
566                        String graphId = (String) data[1];
567                        Long timeId = (Long) data[2];
568                        String attribute = (String) data[3];
569                        Object value = data[4];
570
571                        sendGraphAttributeAdded(graphId, timeId, attribute, value);
572                } else if (data[0].equals(GraphEvents.CHG_GRAPH_ATTR)) {
573                        String graphId = (String) data[1];
574                        Long timeId = (Long) data[2];
575                        String attribute = (String) data[3];
576                        Object oldValue = data[4];
577                        Object newValue = data[5];
578
579                        sendGraphAttributeChanged(graphId, timeId, attribute, oldValue,
580                                        newValue);
581                } else if (data[0].equals(GraphEvents.DEL_GRAPH_ATTR)) {
582                        String graphId = (String) data[1];
583                        Long timeId = (Long) data[2];
584                        String attribute = (String) data[3];
585
586                        sendGraphAttributeRemoved(graphId, timeId, attribute);
587                } else if (data[0].equals(GraphEvents.ADD_EDGE_ATTR)) {
588                        String graphId = (String) data[1];
589                        Long timeId = (Long) data[2];
590                        String edgeId = (String) data[3];
591                        String attribute = (String) data[4];
592                        Object value = data[5];
593
594                        sendEdgeAttributeAdded(graphId, timeId, edgeId, attribute, value);
595                } else if (data[0].equals(GraphEvents.CHG_EDGE_ATTR)) {
596                        String graphId = (String) data[1];
597                        Long timeId = (Long) data[2];
598                        String edgeId = (String) data[3];
599                        String attribute = (String) data[4];
600                        Object oldValue = data[5];
601                        Object newValue = data[6];
602
603                        sendEdgeAttributeChanged(graphId, timeId, edgeId, attribute,
604                                        oldValue, newValue);
605                } else if (data[0].equals(GraphEvents.DEL_EDGE_ATTR)) {
606                        String graphId = (String) data[1];
607                        Long timeId = (Long) data[2];
608                        String edgeId = (String) data[3];
609                        String attribute = (String) data[4];
610
611                        sendEdgeAttributeRemoved(graphId, timeId, edgeId, attribute);
612                } else if (data[0].equals(GraphEvents.ADD_NODE_ATTR)) {
613                        String graphId = (String) data[1];
614                        Long timeId = (Long) data[2];
615                        String nodeId = (String) data[3];
616                        String attribute = (String) data[4];
617                        Object value = data[5];
618
619                        sendNodeAttributeAdded(graphId, timeId, nodeId, attribute, value);
620                } else if (data[0].equals(GraphEvents.CHG_NODE_ATTR)) {
621                        String graphId = (String) data[1];
622                        Long timeId = (Long) data[2];
623                        String nodeId = (String) data[3];
624                        String attribute = (String) data[4];
625                        Object oldValue = data[5];
626                        Object newValue = data[6];
627
628                        sendNodeAttributeChanged(graphId, timeId, nodeId, attribute,
629                                        oldValue, newValue);
630                } else if (data[0].equals(GraphEvents.DEL_NODE_ATTR)) {
631                        String graphId = (String) data[1];
632                        Long timeId = (Long) data[2];
633                        String nodeId = (String) data[3];
634                        String attribute = (String) data[4];
635
636                        sendNodeAttributeRemoved(graphId, timeId, nodeId, attribute);
637                } else if (data[0].equals(GraphEvents.CLEARED)) {
638                        String graphId = (String) data[1];
639                        Long timeId = (Long) data[2];
640
641                        sendGraphCleared(graphId, timeId);
642                } else {
643                        System.err.printf("ThreadProxyFilter : Unknown message %s !!%n",
644                                        data[0]);
645                }
646        }
647}