Clover coverage report -
Coverage timestamp: So Nov 6 2005 14:19:51 CET
file stats: LOC: 204   Methods: 7
NCLOC: 83   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
JavaGroupsBroadcastingListener.java 0% 0% 0% 0%
coverage
 1    /*
 2    * Copyright (c) 2002-2003 by OpenSymphony
 3    * All rights reserved.
 4    */
 5    package com.opensymphony.oscache.plugins.clustersupport;
 6   
 7    import com.opensymphony.oscache.base.Cache;
 8    import com.opensymphony.oscache.base.Config;
 9    import com.opensymphony.oscache.base.FinalizationException;
 10    import com.opensymphony.oscache.base.InitializationException;
 11   
 12    import org.apache.commons.logging.Log;
 13    import org.apache.commons.logging.LogFactory;
 14   
 15    import org.jgroups.Address;
 16    import org.jgroups.Channel;
 17   
 18    import org.jgroups.blocks.NotificationBus;
 19   
 20    import java.io.Serializable;
 21   
 22    /**
 23    * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
 24    * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
 25    * messages across a cluster.</p>
 26    *
 27    * <p>One of the following properties should be configured in <code>oscache.properties</code> for
 28    * this listener:
 29    * <ul>
 30    * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
 31    * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
 32    * control over the behaviour of JavaGroups</li>
 33    * </ul>
 34    * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
 35    *
 36    * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
 37    */
 38    public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
 39    private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
 40    private static final String BUS_NAME = "OSCacheBus";
 41    private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
 42    private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
 43   
 44    /**
 45    * The first half of the default channel properties. They default channel properties are:
 46    * <pre>
 47    * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
 48    * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
 49    * PING(timeout=2000;num_initial_members=3):\
 50    * MERGE2(min_interval=5000;max_interval=10000):\
 51    * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
 52    * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
 53    * UNICAST(timeout=300,600,1200,2400):\
 54    * pbcast.STABLE(desired_avg_gossip=20000):\
 55    * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
 56    * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
 57    * </pre>
 58    *
 59    * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
 60    */
 61    private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
 62   
 63    /**
 64    * The second half of the default channel properties. They default channel properties are:
 65    * <pre>
 66    * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
 67    * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
 68    * PING(timeout=2000;num_initial_members=3):\
 69    * MERGE2(min_interval=5000;max_interval=10000):\
 70    * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
 71    * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
 72    * UNICAST(timeout=300,600,1200,2400):\
 73    * pbcast.STABLE(desired_avg_gossip=20000):\
 74    * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
 75    * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
 76    * </pre>
 77    *
 78    * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
 79    */
 80    private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
 81    private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
 82    private NotificationBus bus;
 83   
 84    /**
 85    * Initializes the broadcasting listener by starting up a JavaGroups notification
 86    * bus instance to handle incoming and outgoing messages.
 87    *
 88    * @param config An OSCache configuration object.
 89    * @throws com.opensymphony.oscache.base.InitializationException If this listener has
 90    * already been initialized.
 91    */
 92  0 public synchronized void initialize(Cache cache, Config config) throws InitializationException {
 93  0 super.initialize(cache, config);
 94   
 95  0 String properties = config.getProperty(CHANNEL_PROPERTIES);
 96  0 String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
 97   
 98  0 if ((properties == null) && (multicastIP == null)) {
 99  0 multicastIP = DEFAULT_MULTICAST_IP;
 100    }
 101   
 102  0 if (properties == null) {
 103  0 properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
 104    } else {
 105  0 properties = properties.trim();
 106    }
 107   
 108  0 if (log.isInfoEnabled()) {
 109  0 log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
 110    }
 111   
 112  0 try {
 113  0 bus = new NotificationBus(BUS_NAME, properties);
 114  0 bus.start();
 115  0 bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
 116  0 bus.setConsumer(this);
 117  0 log.info("JavaGroups clustering support started successfully");
 118    } catch (Exception e) {
 119  0 throw new InitializationException("Initialization failed: " + e);
 120    }
 121    }
 122   
 123    /**
 124    * Shuts down the JavaGroups being managed by this listener. This
 125    * occurs once the cache is shut down and this listener is no longer
 126    * in use.
 127    *
 128    * @throws com.opensymphony.oscache.base.FinalizationException
 129    */
 130  0 public synchronized void finialize() throws FinalizationException {
 131  0 if (log.isInfoEnabled()) {
 132  0 log.info("JavaGroups shutting down...");
 133    }
 134   
 135    // It's possible that the notification bus is null (CACHE-154)
 136  0 if (bus != null) {
 137  0 bus.stop();
 138  0 bus = null;
 139    } else {
 140  0 log.warn("Notification bus wasn't initialized or finialize was invoked before!");
 141    }
 142   
 143  0 if (log.isInfoEnabled()) {
 144  0 log.info("JavaGroups shutdown complete.");
 145    }
 146    }
 147   
 148    /**
 149    * Uses JavaGroups to broadcast the supplied notification message across the cluster.
 150    *
 151    * @param message The cluster nofication message to broadcast.
 152    */
 153  0 protected void sendNotification(ClusterNotification message) {
 154  0 bus.sendNotification(message);
 155    }
 156   
 157    /**
 158    * Handles incoming notification messages from JavaGroups. This method should
 159    * never be called directly.
 160    *
 161    * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
 162    */
 163  0 public void handleNotification(Serializable serializable) {
 164  0 if (!(serializable instanceof ClusterNotification)) {
 165  0 log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
 166   
 167  0 return;
 168    }
 169   
 170  0 handleClusterNotification((ClusterNotification) serializable);
 171    }
 172   
 173    /**
 174    * We are not using the caching, so we just return something that identifies
 175    * us. This method should never be called directly.
 176    */
 177  0 public Serializable getCache() {
 178  0 return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
 179    }
 180   
 181    /**
 182    * A callback that is fired when a new member joins the cluster. This
 183    * method should never be called directly.
 184    *
 185    * @param address The address of the member who just joined.
 186    */
 187  0 public void memberJoined(Address address) {
 188  0 if (log.isInfoEnabled()) {
 189  0 log.info("A new member at address '" + address + "' has joined the cluster");
 190    }
 191    }
 192   
 193    /**
 194    * A callback that is fired when an existing member leaves the cluster.
 195    * This method should never be called directly.
 196    *
 197    * @param address The address of the member who left.
 198    */
 199  0 public void memberLeft(Address address) {
 200  0 if (log.isInfoEnabled()) {
 201  0 log.info("Member at address '" + address + "' left the cluster");
 202    }
 203    }
 204    }