1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| package com.opensymphony.oscache.plugins.clustersupport; |
6 |
| |
7 |
| import com.opensymphony.oscache.base.Cache; |
8 |
| import com.opensymphony.oscache.base.Config; |
9 |
| import com.opensymphony.oscache.base.FinalizationException; |
10 |
| import com.opensymphony.oscache.base.InitializationException; |
11 |
| |
12 |
| import org.apache.commons.logging.Log; |
13 |
| import org.apache.commons.logging.LogFactory; |
14 |
| |
15 |
| import org.jgroups.Address; |
16 |
| import org.jgroups.Channel; |
17 |
| |
18 |
| import org.jgroups.blocks.NotificationBus; |
19 |
| |
20 |
| import java.io.Serializable; |
21 |
| |
22 |
| |
23 |
| |
24 |
| |
25 |
| |
26 |
| |
27 |
| |
28 |
| |
29 |
| |
30 |
| |
31 |
| |
32 |
| |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer { |
39 |
| private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class); |
40 |
| private static final String BUS_NAME = "OSCacheBus"; |
41 |
| private static final String CHANNEL_PROPERTIES = "cache.cluster.properties"; |
42 |
| private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip"; |
43 |
| |
44 |
| |
45 |
| |
46 |
| |
47 |
| |
48 |
| |
49 |
| |
50 |
| |
51 |
| |
52 |
| |
53 |
| |
54 |
| |
55 |
| |
56 |
| |
57 |
| |
58 |
| |
59 |
| |
60 |
| |
61 |
| private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr="; |
62 |
| |
63 |
| |
64 |
| |
65 |
| |
66 |
| |
67 |
| |
68 |
| |
69 |
| |
70 |
| |
71 |
| |
72 |
| |
73 |
| |
74 |
| |
75 |
| |
76 |
| |
77 |
| |
78 |
| |
79 |
| |
80 |
| private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)"; |
81 |
| private static final String DEFAULT_MULTICAST_IP = "231.12.21.132"; |
82 |
| private NotificationBus bus; |
83 |
| |
84 |
| |
85 |
| |
86 |
| |
87 |
| |
88 |
| |
89 |
| |
90 |
| |
91 |
| |
92 |
0
| public synchronized void initialize(Cache cache, Config config) throws InitializationException {
|
93 |
0
| super.initialize(cache, config);
|
94 |
| |
95 |
0
| String properties = config.getProperty(CHANNEL_PROPERTIES);
|
96 |
0
| String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
|
97 |
| |
98 |
0
| if ((properties == null) && (multicastIP == null)) {
|
99 |
0
| multicastIP = DEFAULT_MULTICAST_IP;
|
100 |
| } |
101 |
| |
102 |
0
| if (properties == null) {
|
103 |
0
| properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
|
104 |
| } else { |
105 |
0
| properties = properties.trim();
|
106 |
| } |
107 |
| |
108 |
0
| if (log.isInfoEnabled()) {
|
109 |
0
| log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
|
110 |
| } |
111 |
| |
112 |
0
| try {
|
113 |
0
| bus = new NotificationBus(BUS_NAME, properties);
|
114 |
0
| bus.start();
|
115 |
0
| bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
|
116 |
0
| bus.setConsumer(this);
|
117 |
0
| log.info("JavaGroups clustering support started successfully");
|
118 |
| } catch (Exception e) { |
119 |
0
| throw new InitializationException("Initialization failed: " + e);
|
120 |
| } |
121 |
| } |
122 |
| |
123 |
| |
124 |
| |
125 |
| |
126 |
| |
127 |
| |
128 |
| |
129 |
| |
130 |
0
| public synchronized void finialize() throws FinalizationException {
|
131 |
0
| if (log.isInfoEnabled()) {
|
132 |
0
| log.info("JavaGroups shutting down...");
|
133 |
| } |
134 |
| |
135 |
| |
136 |
0
| if (bus != null) {
|
137 |
0
| bus.stop();
|
138 |
0
| bus = null;
|
139 |
| } else { |
140 |
0
| log.warn("Notification bus wasn't initialized or finialize was invoked before!");
|
141 |
| } |
142 |
| |
143 |
0
| if (log.isInfoEnabled()) {
|
144 |
0
| log.info("JavaGroups shutdown complete.");
|
145 |
| } |
146 |
| } |
147 |
| |
148 |
| |
149 |
| |
150 |
| |
151 |
| |
152 |
| |
153 |
0
| protected void sendNotification(ClusterNotification message) {
|
154 |
0
| bus.sendNotification(message);
|
155 |
| } |
156 |
| |
157 |
| |
158 |
| |
159 |
| |
160 |
| |
161 |
| |
162 |
| |
163 |
0
| public void handleNotification(Serializable serializable) {
|
164 |
0
| if (!(serializable instanceof ClusterNotification)) {
|
165 |
0
| log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
|
166 |
| |
167 |
0
| return;
|
168 |
| } |
169 |
| |
170 |
0
| handleClusterNotification((ClusterNotification) serializable);
|
171 |
| } |
172 |
| |
173 |
| |
174 |
| |
175 |
| |
176 |
| |
177 |
0
| public Serializable getCache() {
|
178 |
0
| return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
|
179 |
| } |
180 |
| |
181 |
| |
182 |
| |
183 |
| |
184 |
| |
185 |
| |
186 |
| |
187 |
0
| public void memberJoined(Address address) {
|
188 |
0
| if (log.isInfoEnabled()) {
|
189 |
0
| log.info("A new member at address '" + address + "' has joined the cluster");
|
190 |
| } |
191 |
| } |
192 |
| |
193 |
| |
194 |
| |
195 |
| |
196 |
| |
197 |
| |
198 |
| |
199 |
0
| public void memberLeft(Address address) {
|
200 |
0
| if (log.isInfoEnabled()) {
|
201 |
0
| log.info("Member at address '" + address + "' left the cluster");
|
202 |
| } |
203 |
| } |
204 |
| } |