001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 * 
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 * 
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.log4j.net;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.io.ObjectOutputStream;
023import java.net.InetAddress;
024import java.net.ServerSocket;
025import java.net.Socket;
026import java.net.SocketException;
027import java.util.Vector;
028
029import org.apache.log4j.AppenderSkeleton;
030import org.apache.log4j.helpers.CyclicBuffer;
031import org.apache.log4j.helpers.LogLog;
032import org.apache.log4j.spi.LoggingEvent;
033
034/**
035  Sends {@link LoggingEvent} objects to a set of remote log servers,
036  usually a {@link SocketNode SocketNodes}.
037    
038  <p>Acts just like {@link SocketAppender} except that instead of
039  connecting to a given remote log server,
040  <code>SocketHubAppender</code> accepts connections from the remote
041  log servers as clients.  It can accept more than one connection.
042  When a log event is received, the event is sent to the set of
043  currently connected remote log servers. Implemented this way it does
044  not require any update to the configuration file to send data to
045  another remote log server. The remote log server simply connects to
046  the host and port the <code>SocketHubAppender</code> is running on.
047  
048  <p>The <code>SocketHubAppender</code> does not store events such
049  that the remote side will events that arrived after the
050  establishment of its connection. Once connected, events arrive in
051  order as guaranteed by the TCP protocol.
052
053  <p>This implementation borrows heavily from the {@link
054  SocketAppender}.
055
056  <p>The SocketHubAppender has the following characteristics:
057  
058  <ul>
059  
060  <p><li>If sent to a {@link SocketNode}, logging is non-intrusive as
061  far as the log event is concerned. In other words, the event will be
062  logged with the same time stamp, {@link org.apache.log4j.NDC},
063  location info as if it were logged locally.
064  
065  <p><li><code>SocketHubAppender</code> does not use a layout. It
066  ships a serialized {@link LoggingEvent} object to the remote side.
067  
068  <p><li><code>SocketHubAppender</code> relies on the TCP
069  protocol. Consequently, if the remote side is reachable, then log
070  events will eventually arrive at remote client.
071  
072  <p><li>If no remote clients are attached, the logging requests are
073  simply dropped.
074  
075  <p><li>Logging events are automatically <em>buffered</em> by the
076  native TCP implementation. This means that if the link to remote
077  client is slow but still faster than the rate of (log) event
078  production, the application will not be affected by the slow network
079  connection. However, if the network connection is slower then the
080  rate of event production, then the local application can only
081  progress at the network rate. In particular, if the network link to
082  the the remote client is down, the application will be blocked.
083  
084  <p>On the other hand, if the network link is up, but the remote
085  client is down, the client will not be blocked when making log
086  requests but the log events will be lost due to client
087  unavailability. 
088
089  <p>The single remote client case extends to multiple clients
090  connections. The rate of logging will be determined by the slowest
091  link.
092    
093  <p><li>If the JVM hosting the <code>SocketHubAppender</code> exits
094  before the <code>SocketHubAppender</code> is closed either
095  explicitly or subsequent to garbage collection, then there might
096  be untransmitted data in the pipe which might be lost. This is a
097  common problem on Windows based systems.
098  
099  <p>To avoid lost data, it is usually sufficient to {@link #close}
100  the <code>SocketHubAppender</code> either explicitly or by calling
101  the {@link org.apache.log4j.LogManager#shutdown} method before
102  exiting the application.
103  
104  </ul>
105     
106  @author Mark Womack */
107
108public class SocketHubAppender extends AppenderSkeleton {
109
110  /**
111     The default port number of the ServerSocket will be created on. */
112  static final int DEFAULT_PORT = 4560;
113  
114  private int port = DEFAULT_PORT;
115  private Vector oosList = new Vector();
116  private ServerMonitor serverMonitor = null;
117  private boolean locationInfo = false;
118  private CyclicBuffer buffer = null;
119  private String application;
120  private boolean advertiseViaMulticastDNS;
121  private ZeroConfSupport zeroConf;
122
123  /**
124   * The MulticastDNS zone advertised by a SocketHubAppender
125   */
126  public static final String ZONE = "_log4j_obj_tcpaccept_appender.local.";
127  private ServerSocket serverSocket;
128
129
130    public SocketHubAppender() { }
131
132  /**
133     Connects to remote server at <code>address</code> and <code>port</code>. */
134  public
135  SocketHubAppender(int _port) {
136    port = _port;
137    startServer();
138  }
139
140  /**
141     Set up the socket server on the specified port.  */
142  public
143  void activateOptions() {
144    if (advertiseViaMulticastDNS) {
145      zeroConf = new ZeroConfSupport(ZONE, port, getName());
146      zeroConf.advertise();
147    }
148    startServer();
149  }
150
151  /**
152     Close this appender. 
153     <p>This will mark the appender as closed and
154     call then {@link #cleanUp} method. */
155  synchronized
156  public
157  void close() {
158    if(closed)
159      return;
160
161        LogLog.debug("closing SocketHubAppender " + getName());
162    this.closed = true;
163    if (advertiseViaMulticastDNS) {
164      zeroConf.unadvertise();
165    }
166    cleanUp();
167
168        LogLog.debug("SocketHubAppender " + getName() + " closed");
169  }
170
171  /**
172     Release the underlying ServerMonitor thread, and drop the connections
173     to all connected remote servers. */
174  public 
175  void cleanUp() {
176    // stop the monitor thread
177        LogLog.debug("stopping ServerSocket");
178    serverMonitor.stopMonitor();
179    serverMonitor = null;
180
181    // close all of the connections
182        LogLog.debug("closing client connections");
183    while (oosList.size() != 0) {
184      ObjectOutputStream oos = (ObjectOutputStream)oosList.elementAt(0);
185      if(oos != null) {
186        try {
187                oos.close();
188        } catch(InterruptedIOException e) {
189            Thread.currentThread().interrupt();
190            LogLog.error("could not close oos.", e);
191        } catch(IOException e) {
192            LogLog.error("could not close oos.", e);
193        }
194        
195        oosList.removeElementAt(0);     
196      }
197    }
198  }
199
200  /**
201    Append an event to all of current connections. */
202  public
203  void append(LoggingEvent event) {
204    if (event != null) {
205      // set up location info if requested
206      if (locationInfo) {
207        event.getLocationInformation();
208      }
209      if (application != null) {
210          event.setProperty("application", application);
211        } 
212        event.getNDC();
213        event.getThreadName();
214        event.getMDCCopy();
215        event.getRenderedMessage();
216        event.getThrowableStrRep();
217        
218      if (buffer != null) {
219        buffer.add(event);
220      }
221    }
222
223    // if no event or no open connections, exit now
224    if ((event == null) || (oosList.size() == 0)) {
225      return;
226    }
227
228        // loop through the current set of open connections, appending the event to each
229    for (int streamCount = 0; streamCount < oosList.size(); streamCount++) {            
230
231      ObjectOutputStream oos = null;
232      try {
233        oos = (ObjectOutputStream)oosList.elementAt(streamCount);
234      }
235      catch (ArrayIndexOutOfBoundsException e) {
236        // catch this, but just don't assign a value
237        // this should not really occur as this method is
238        // the only one that can remove oos's (besides cleanUp).
239      }
240      
241      // list size changed unexpectedly? Just exit the append.
242      if (oos == null)
243        break;
244        
245      try {
246        oos.writeObject(event);
247        oos.flush();
248        // Failing to reset the object output stream every now and
249        // then creates a serious memory leak.
250        // right now we always reset. TODO - set up frequency counter per oos?
251        oos.reset();
252      }
253      catch(IOException e) {
254        if (e instanceof InterruptedIOException) {
255            Thread.currentThread().interrupt();
256        }
257          // there was an io exception so just drop the connection
258        oosList.removeElementAt(streamCount);
259        LogLog.debug("dropped connection");
260        
261        // decrement to keep the counter in place (for loop always increments)
262        streamCount--;
263      }
264    }
265  }
266  
267  /**
268     The SocketHubAppender does not use a layout. Hence, this method returns
269     <code>false</code>. */
270  public
271  boolean requiresLayout() {
272    return false;
273  }
274  
275  /**
276     The <b>Port</b> option takes a positive integer representing
277     the port where the server is waiting for connections. */
278  public
279  void setPort(int _port) {
280    port = _port;
281        }
282
283  /**
284   * The <b>App</b> option takes a string value which should be the name of the application getting logged. If property was already set (via system
285   * property), don't set here.
286   */
287  public 
288  void setApplication(String lapp) {
289    this.application = lapp;
290  }
291
292  /**
293   * Returns value of the <b>Application</b> option.
294   */
295  public 
296  String getApplication() {
297    return application;
298  }
299  
300  /**
301     Returns value of the <b>Port</b> option. */
302  public
303  int getPort() {
304    return port;
305  }
306
307  /**
308   * The <b>BufferSize</b> option takes a positive integer representing the number of events this appender will buffer and send to newly connected
309   * clients.
310   */
311  public 
312  void setBufferSize(int _bufferSize) {
313    buffer = new CyclicBuffer(_bufferSize);
314  }
315
316  /**
317   * Returns value of the <b>bufferSize</b> option.
318   */
319  public 
320  int getBufferSize() {
321    if (buffer == null) {
322      return 0;
323    } else {
324      return buffer.getMaxSize();
325    }
326  }
327  
328  /**
329     The <b>LocationInfo</b> option takes a boolean value. If true,
330     the information sent to the remote host will include location
331     information. By default no location information is sent to the server. */
332  public
333  void setLocationInfo(boolean _locationInfo) {
334    locationInfo = _locationInfo;
335  }
336  
337  /**
338     Returns value of the <b>LocationInfo</b> option. */
339  public
340  boolean getLocationInfo() {
341    return locationInfo;
342  }
343
344  public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
345    this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
346  }
347
348  public boolean isAdvertiseViaMulticastDNS() {
349    return advertiseViaMulticastDNS;
350  }
351
352  /**
353    Start the ServerMonitor thread. */
354  private
355  void startServer() {
356    serverMonitor = new ServerMonitor(port, oosList);
357  }
358  
359  /**
360   * Creates a server socket to accept connections.
361   * @param socketPort port on which the socket should listen, may be zero.
362   * @return new socket.
363   * @throws IOException IO error when opening the socket. 
364   */
365  protected ServerSocket createServerSocket(final int socketPort) throws IOException {
366      return new ServerSocket(socketPort);
367  }
368
369  /**
370    This class is used internally to monitor a ServerSocket
371    and register new connections in a vector passed in the
372    constructor. */
373  private class ServerMonitor implements Runnable {
374    private int port;
375    private Vector oosList;
376    private boolean keepRunning;
377    private Thread monitorThread;
378    
379    /**
380      Create a thread and start the monitor. */
381    public
382    ServerMonitor(int _port, Vector _oosList) {
383      port = _port;
384      oosList = _oosList;
385      keepRunning = true;
386      monitorThread = new Thread(this);
387      monitorThread.setDaemon(true);
388      monitorThread.setName("SocketHubAppender-Monitor-" + port);
389      monitorThread.start();
390    }
391    
392    /**
393      Stops the monitor. This method will not return until
394      the thread has finished executing. */
395    public synchronized void stopMonitor() {
396      if (keepRunning) {
397        LogLog.debug("server monitor thread shutting down");
398        keepRunning = false;
399        try {
400            if (serverSocket != null) {
401                serverSocket.close();
402                serverSocket = null;
403            }
404        } catch (IOException ioe) {}
405
406        try {
407          monitorThread.join();
408        }
409        catch (InterruptedException e) {
410            Thread.currentThread().interrupt();
411          // do nothing?
412        }
413        
414        // release the thread
415        monitorThread = null;
416        LogLog.debug("server monitor thread shut down");
417      }
418    }
419    
420    private 
421    void sendCachedEvents(ObjectOutputStream stream) throws IOException {
422      if (buffer != null) {
423        for (int i = 0; i < buffer.length(); i++) {
424          stream.writeObject(buffer.get(i));
425        }
426        stream.flush();
427        stream.reset();
428      }
429    }
430
431    /**
432      Method that runs, monitoring the ServerSocket and adding connections as
433      they connect to the socket. */
434    public
435    void run() {
436      serverSocket = null;
437      try {
438        serverSocket = createServerSocket(port);
439        serverSocket.setSoTimeout(1000);
440      }
441      catch (Exception e) {
442        if (e instanceof InterruptedIOException || e instanceof InterruptedException) {
443            Thread.currentThread().interrupt();
444        }
445        LogLog.error("exception setting timeout, shutting down server socket.", e);
446        keepRunning = false;
447        return;
448      }
449
450      try {
451        try {
452                serverSocket.setSoTimeout(1000);
453        }
454        catch (SocketException e) {
455          LogLog.error("exception setting timeout, shutting down server socket.", e);
456          return;
457        }
458      
459        while (keepRunning) {
460          Socket socket = null;
461          try {
462            socket = serverSocket.accept();
463          }
464          catch (InterruptedIOException e) {
465            // timeout occurred, so just loop
466          }
467          catch (SocketException e) {
468            LogLog.error("exception accepting socket, shutting down server socket.", e);
469            keepRunning = false;
470          }
471          catch (IOException e) {
472            LogLog.error("exception accepting socket.", e);
473          }
474                
475          // if there was a socket accepted
476          if (socket != null) {
477            try {
478              InetAddress remoteAddress = socket.getInetAddress();
479              LogLog.debug("accepting connection from " + remoteAddress.getHostName() 
480                           + " (" + remoteAddress.getHostAddress() + ")");
481                        
482              // create an ObjectOutputStream
483              ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
484              if (buffer != null && buffer.length() > 0) {
485                sendCachedEvents(oos);
486              }
487                    
488              // add it to the oosList.  OK since Vector is synchronized.
489              oosList.addElement(oos);
490            } catch (IOException e) {
491              if (e instanceof InterruptedIOException) {
492                    Thread.currentThread().interrupt();
493              }
494              LogLog.error("exception creating output stream on socket.", e);
495            }
496          }
497        }
498      }
499      finally {
500        // close the socket
501        try {
502                serverSocket.close();
503        } catch(InterruptedIOException e) {
504            Thread.currentThread().interrupt();  
505        } catch (IOException e) {
506                // do nothing with it?
507        }
508      }
509    }
510  }
511}
512