XRootD
Loading...
Searching...
No Matches
XrdClInQueue.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
20#include "XrdCl/XrdClInQueue.hh"
22#include "XrdCl/XrdClMessage.hh"
23#include "XrdCl/XrdClLog.hh"
26
27#include <arpa/inet.h> // for network unmarshalling stuff
28
29namespace XrdCl
30{
31 //----------------------------------------------------------------------------
32 // Filter messages
33 //----------------------------------------------------------------------------
34 bool InQueue::DiscardMessage( Message& msg, uint16_t& sid) const
35 {
36 if( msg.GetSize() < 8 )
37 return true;
38
39 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
40
41 // We only care about async responses, but those are extracted now
42 // in the SocketHandler
43 if( rsp->hdr.status == kXR_attn )
44 return true;
45 else
46 sid = ((uint16_t)rsp->hdr.streamid[1] << 8) | (uint16_t)rsp->hdr.streamid[0];
47
48 return false;
49 }
50
51 //----------------------------------------------------------------------------
52 // Add a listener that should be notified about incoming messages
53 //----------------------------------------------------------------------------
54 void InQueue::AddMessageHandler( MsgHandler *handler, time_t expires, bool &rmMsg )
55 {
56 uint16_t handlerSid = handler->GetSid();
57 XrdSysMutexHelper scopedLock( pMutex );
58
59 pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
60 }
61
62 //----------------------------------------------------------------------------
63 // Get a message handler interested in receiving message whose header
64 // is stored in msg
65 //----------------------------------------------------------------------------
66 MsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
67 time_t &expires,
68 uint16_t &action )
69 {
70 time_t exp = 0;
71 uint16_t act = 0;
72 uint16_t msgSid = 0;
73 MsgHandler* handler = 0;
74
75 if (DiscardMessage(*msg, msgSid))
76 {
77 return handler;
78 }
79
80 XrdSysMutexHelper scopedLock( pMutex );
81 HandlerMap::iterator it = pHandlers.find(msgSid);
82
83 if (it != pHandlers.end())
84 {
85 Log *log = DefaultEnv::GetLog();
86 handler = it->second.first;
87 act = handler->Examine( msg );
88 exp = it->second.second;
89 log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
90 msg.get(), handler );
91
92
94 {
95 pHandlers.erase( it );
96 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
97 handler, handler );
98 }
99 }
100
101 if( handler )
102 {
103 expires = exp;
104 action = act;
105 }
106
107 return handler;
108 }
109
110 //----------------------------------------------------------------------------
111 // Re-insert the handler without scanning the cached messages
112 //----------------------------------------------------------------------------
114 time_t expires )
115 {
116 uint16_t handlerSid = handler->GetSid();
117 XrdSysMutexHelper scopedLock( pMutex );
118 pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
119 }
120
121 //----------------------------------------------------------------------------
122 // Remove a listener
123 //----------------------------------------------------------------------------
125 {
126 uint16_t handlerSid = handler->GetSid();
127 XrdSysMutexHelper scopedLock( pMutex );
128 pHandlers.erase(handlerSid);
129 Log *log = DefaultEnv::GetLog();
130 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
131 handler, handler );
132
133 }
134
135 //----------------------------------------------------------------------------
136 // Report an event to the handlers
137 //----------------------------------------------------------------------------
139 XRootDStatus status )
140 {
141 uint8_t action = 0;
142 XrdSysMutexHelper scopedLock( pMutex );
143 for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
144 {
145 action = it->second.first->OnStreamEvent( event, status );
146
147 if( action & MsgHandler::RemoveHandler )
148 {
149 auto next = it; ++next;
150 pHandlers.erase( it );
151 it = next;
152 }
153 else
154 ++it;
155 }
156 }
157
158 //----------------------------------------------------------------------------
159 // Timeout handlers
160 //----------------------------------------------------------------------------
161 void InQueue::ReportTimeout( time_t now )
162 {
163 if( !now )
164 now = ::time(0);
165
166 XrdSysMutexHelper scopedLock( pMutex );
167 HandlerMap::iterator it = pHandlers.begin();
168 while( it != pHandlers.end() )
169 {
170 if( it->second.second <= now )
171 {
172 uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
174 auto next = it; ++next;
175 if( act & MsgHandler::RemoveHandler )
176 pHandlers.erase( it );
177 it = next;
178 }
179 else
180 ++it;
181 }
182 }
183}
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_attn
Definition XProtocol.hh:901
ServerResponseHeader hdr
static Log * GetLog()
Get default log.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)=0
virtual uint16_t GetSid() const =0
StreamEvent
Events that may have occurred to the stream.
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t ExDbgMsg
Procedure execution status.