XRootD
Loading...
Searching...
No Matches
XrdClFileStateHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClURL.hh"
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClStatus.hh"
35#include "XrdCl/XrdClMonitor.hh"
41#include "XrdCl/XrdClUtils.hh"
42
43#ifdef WITH_XRDEC
45#endif
46
47#include "XrdOuc/XrdOucCRC.hh"
49#include "XrdOuc/XrdOucUtils.hh"
50
54
55#include <sstream>
56#include <memory>
57#include <numeric>
58#include <sys/time.h>
59#include <uuid/uuid.h>
60#include <mutex>
61
62namespace
63{
64 //----------------------------------------------------------------------------
65 // Helper callback for handling PgRead responses
66 //----------------------------------------------------------------------------
67 class PgReadHandler : public XrdCl::ResponseHandler
68 {
69 friend class PgReadRetryHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
74 // Constructor
75 //------------------------------------------------------------------------
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
82 maincall( true ),
83 retrycnt( 0 ),
84 nbrepair( 0 )
85 {
86 }
87
88 //------------------------------------------------------------------------
89 // Handle the response
90 //------------------------------------------------------------------------
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
93 XrdCl::HostList *hostList )
94 {
95 using namespace XrdCl;
96
97 std::unique_lock<std::mutex> lck( mtx );
98
99 if( !maincall )
100 {
101 //--------------------------------------------------------------------
102 // We are serving PgRead retry request
103 //--------------------------------------------------------------------
104 --retrycnt;
105 if( !status->IsOK() )
106 st.reset( status );
107 else
108 {
109 delete status; // by convention other args are null (see PgReadRetryHandler)
110 ++nbrepair; // update number of repaired pages
111 }
112
113 if( retrycnt == 0 )
114 {
115 //------------------------------------------------------------------
116 // All retries came back
117 //------------------------------------------------------------------
118 if( st->IsOK() )
119 {
120 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121 pginf.SetNbRepair( nbrepair );
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123 }
124 else
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126 lck.unlock();
127 delete this;
128 }
129
130 return;
131 }
132
133 //----------------------------------------------------------------------
134 // We are serving main PgRead request
135 //----------------------------------------------------------------------
136 if( !status->IsOK() )
137 {
138 //--------------------------------------------------------------------
139 // The main PgRead request has failed
140 //--------------------------------------------------------------------
141 userHandler->HandleResponseWithHosts( status, response, hostList );
142 lck.unlock();
143 delete this;
144 return;
145 }
146
147 maincall = false;
148
149 //----------------------------------------------------------------------
150 // Do the integrity check
151 //----------------------------------------------------------------------
152 PageInfo *pginf = 0;
153 response->Get( pginf );
154
155 uint64_t pgoff = pginf->GetOffset();
156 uint32_t bytesRead = pginf->GetLength();
157 std::vector<uint32_t> &cksums = pginf->GetCksums();
158 char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159 size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160 uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161 if( pgsize > bytesRead ) pgsize = bytesRead;
162
163 for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164 {
165 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166 if( crcval != cksums[pgnb] )
167 {
168 Log *log = DefaultEnv::GetLog();
169 log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170 (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171
172 XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173 if( !st.IsOK())
174 {
175 *status = st; // the reason for this failure
176 break;
177 }
178 ++retrycnt; // update the retry counter
179 }
180
181 bytesRead -= pgsize;
182 buffer += pgsize;
183 pgoff += pgsize;
184 pgsize = XrdSys::PageSize;
185 if( pgsize > bytesRead ) pgsize = bytesRead;
186 }
187
188
189 if( retrycnt == 0 )
190 {
191 //--------------------------------------------------------------------
192 // All went well!
193 //--------------------------------------------------------------------
194 userHandler->HandleResponseWithHosts( status, response, hostList );
195 lck.unlock();
196 delete this;
197 return;
198 }
199
200 //----------------------------------------------------------------------
201 // We have to wait for retries!
202 //----------------------------------------------------------------------
203 resp.reset( response );
204 hosts.reset( hostList );
205 st.reset( status );
206 }
207
208 void UpdateCksum( size_t pgnb, uint32_t crcval )
209 {
210 if( resp )
211 {
212 XrdCl::PageInfo *pginf = 0;
213 resp->Get( pginf );
214 pginf->GetCksums()[pgnb] = crcval;
215 }
216 }
217
218 private:
219
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
222 uint64_t orgOffset;
223
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
227
228 std::mutex mtx;
229 bool maincall;
230 size_t retrycnt;
231 size_t nbrepair;
232
233 };
234
235 //----------------------------------------------------------------------------
236 // Helper callback for handling PgRead retries
237 //----------------------------------------------------------------------------
238 class PgReadRetryHandler : public XrdCl::ResponseHandler
239 {
240 public:
241
242 PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243 pgnb( pgnb )
244 {
245
246 }
247
248 //------------------------------------------------------------------------
249 // Handle the response
250 //------------------------------------------------------------------------
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
253 XrdCl::HostList *hostList )
254 {
255 using namespace XrdCl;
256
257 if( !status->IsOK() )
258 {
259 Log *log = DefaultEnv::GetLog();
260 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263 delete this;
264 return;
265 }
266
267 XrdCl::PageInfo *pginf = 0;
268 response->Get( pginf );
269 if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270 {
271 Log *log = DefaultEnv::GetLog();
272 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274 // we retry a page at a time so the length cannot exceed 4KB
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277 delete this;
278 return;
279 }
280
281 uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282 if( crcval != pginf->GetCksums().front() )
283 {
284 Log *log = DefaultEnv::GetLog();
285 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289 delete this;
290 return;
291 }
292
293 Log *log = DefaultEnv::GetLog();
294 log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300 delete this;
301 }
302
303 private:
304
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
307 XrdCl::HostList *hostList )
308 {
309 delete status;
310 delete response;
311 delete hostList;
312 }
313
314 PgReadHandler *pgReadHandler;
315 size_t pgnb;
316 };
317
318 //----------------------------------------------------------------------------
319 // Handle PgRead substitution with ordinary Read
320 //----------------------------------------------------------------------------
321 class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322 {
323 public:
324
325 //------------------------------------------------------------------------
326 // Constructor
327 //------------------------------------------------------------------------
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
332 {
333 }
334
335 //------------------------------------------------------------------------
336 // Handle the response
337 //------------------------------------------------------------------------
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
340 XrdCl::HostList *hostList )
341 {
342 using namespace XrdCl;
343
344 if( !status->IsOK() )
345 {
346 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
347 delete this;
348 return;
349 }
350
351
352 ChunkInfo *chunk = nullptr;
353 rdresp->Get( chunk );
354
355 if( !chunk )
356 {
357 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
358 delete this;
359 return;
360 }
361
362 std::vector<uint32_t> cksums;
363 if( stateHandler->pIsChannelEncrypted )
364 {
365 size_t nbpages = chunk->length / XrdSys::PageSize;
366 if( chunk->length % XrdSys::PageSize )
367 ++nbpages;
368 cksums.reserve( nbpages );
369
370 size_t size = chunk->length;
371 char *buffer = reinterpret_cast<char*>( chunk->buffer );
372
373 for( size_t pg = 0; pg < nbpages; ++pg )
374 {
375 size_t pgsize = XrdSys::PageSize;
376 if( pgsize > size ) pgsize = size;
377 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
378 cksums.push_back( crcval );
379 buffer += pgsize;
380 size -= pgsize;
381 }
382 }
383
384 PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
385 chunk->buffer, std::move( cksums ) );
386 delete rdresp;
387 AnyObject *response = new AnyObject();
388 response->Set( pages );
389 userHandler->HandleResponseWithHosts( status, response, hostList );
390
391 delete this;
392 }
393
394 private:
395
396 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397 XrdCl::ResponseHandler *userHandler;
398 };
399
400 //----------------------------------------------------------------------------
401 // Object that does things to the FileStateHandler when kXR_open returns
402 // and then calls the user handler
403 //----------------------------------------------------------------------------
404 class OpenHandler: public XrdCl::ResponseHandler
405 {
406 public:
407 //------------------------------------------------------------------------
408 // Constructor
409 //------------------------------------------------------------------------
410 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411 XrdCl::ResponseHandler *userHandler ):
412 pStateHandler( stateHandler ),
413 pUserHandler( userHandler )
414 {
415 }
416
417 //------------------------------------------------------------------------
418 // Handle the response
419 //------------------------------------------------------------------------
420 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421 XrdCl::AnyObject *response,
422 XrdCl::HostList *hostList )
423 {
424 using namespace XrdCl;
425
426 //----------------------------------------------------------------------
427 // Extract the statistics info
428 //----------------------------------------------------------------------
429 OpenInfo *openInfo = 0;
430 if( status->IsOK() )
431 response->Get( openInfo );
432#ifdef WITH_XRDEC
433 else
434 //--------------------------------------------------------------------
435 // Handle EC redirect
436 //--------------------------------------------------------------------
437 if( status->code == errRedirect )
438 {
439 std::string ecurl = status->GetErrorMessage();
440 EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
441 if( ecHandler && pStateHandler->NeedFileTempl() )
442 {
443 delete status;
444 status = new XRootDStatus( stError, errNotSupported, 0,
445 "File template not supported with Ec" );
446 delete ecHandler;
447 ecHandler = 0;
448 }
449 else if( ecHandler )
450 {
451 pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
452 ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
453 return;
454 }
455 }
456#endif
457 //----------------------------------------------------------------------
458 // Notify the state handler and the client and say bye bye
459 //----------------------------------------------------------------------
460 pStateHandler->OnOpen( status, openInfo, hostList );
461 delete response;
462 if( pUserHandler )
463 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
464 else
465 {
466 delete status;
467 delete hostList;
468 }
469 delete this;
470 }
471
472 private:
473 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
474 XrdCl::ResponseHandler *pUserHandler;
475 };
476
477 //----------------------------------------------------------------------------
478 // Object that does things to the FileStateHandler when kXR_close returns
479 // and then calls the user handler
480 //----------------------------------------------------------------------------
481 class CloseHandler: public XrdCl::ResponseHandler
482 {
483 public:
484 //------------------------------------------------------------------------
485 // Constructor
486 //------------------------------------------------------------------------
487 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
488 XrdCl::ResponseHandler *userHandler,
489 XrdCl::Message *message ):
490 pStateHandler( stateHandler ),
491 pUserHandler( userHandler ),
492 pMessage( message )
493 {
494 }
495
496 //------------------------------------------------------------------------
498 //------------------------------------------------------------------------
499 virtual ~CloseHandler()
500 {
501 delete pMessage;
502 }
503
504 //------------------------------------------------------------------------
505 // Handle the response
506 //------------------------------------------------------------------------
507 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
508 XrdCl::AnyObject *response,
509 XrdCl::HostList *hostList )
510 {
511 pStateHandler->OnClose( status );
512 if( pUserHandler )
513 pUserHandler->HandleResponseWithHosts( status, response, hostList );
514 else
515 {
516 delete response;
517 delete status;
518 delete hostList;
519 }
520
521 delete this;
522 }
523
524 private:
525 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
526 XrdCl::ResponseHandler *pUserHandler;
527 XrdCl::Message *pMessage;
528 };
529
530 //----------------------------------------------------------------------------
531 // Stateful message handler
532 //----------------------------------------------------------------------------
533 class StatefulHandler: public XrdCl::ResponseHandler
534 {
535 public:
536 //------------------------------------------------------------------------
537 // Constructor
538 //------------------------------------------------------------------------
539 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
540 XrdCl::ResponseHandler *userHandler,
541 XrdCl::Message *message,
542 const XrdCl::MessageSendParams &sendParams ):
543 pStateHandler( stateHandler ),
544 pUserHandler( userHandler ),
545 pMessage( message ),
546 pSendParams( sendParams )
547 {
548 }
549
550 //------------------------------------------------------------------------
551 // Destructor
552 //------------------------------------------------------------------------
553 virtual ~StatefulHandler()
554 {
555 delete pMessage;
556 delete pSendParams.chunkList;
557 delete pSendParams.kbuff;
558 }
559
560 //------------------------------------------------------------------------
561 // Handle the response
562 //------------------------------------------------------------------------
563 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
564 XrdCl::AnyObject *response,
565 XrdCl::HostList *hostList )
566 {
567 using namespace XrdCl;
568 std::unique_ptr<AnyObject> responsePtr( response );
569 pSendParams.hostList = hostList;
570
571 //----------------------------------------------------------------------
572 // Houston we have a problem...
573 //----------------------------------------------------------------------
574 if( !status->IsOK() )
575 {
576 XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
577 return;
578 }
579
580 //----------------------------------------------------------------------
581 // We're clear
582 //----------------------------------------------------------------------
583 responsePtr.release();
584 XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
585 if( pUserHandler )
586 pUserHandler->HandleResponseWithHosts( status, response, hostList );
587 else
588 {
589 delete status,
590 delete response;
591 delete hostList;
592 }
593 delete this;
594 }
595
596 //------------------------------------------------------------------------
598 //------------------------------------------------------------------------
599 XrdCl::ResponseHandler *GetUserHandler()
600 {
601 return pUserHandler;
602 }
603
604 private:
605 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
606 XrdCl::ResponseHandler *pUserHandler;
607 XrdCl::Message *pMessage;
608 XrdCl::MessageSendParams pSendParams;
609 };
610
611 //----------------------------------------------------------------------------
612 // Release-buffer Handler
613 //----------------------------------------------------------------------------
614 class ReleaseBufferHandler: public XrdCl::ResponseHandler
615 {
616 public:
617
618 //------------------------------------------------------------------------
619 // Constructor
620 //------------------------------------------------------------------------
621 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
622 buffer( std::move( buffer ) ),
623 handler( handler )
624 {
625 }
626
627 //------------------------------------------------------------------------
628 // Handle the response
629 //------------------------------------------------------------------------
630 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
631 XrdCl::AnyObject *response,
632 XrdCl::HostList *hostList )
633 {
634 if (handler)
635 handler->HandleResponseWithHosts( status, response, hostList );
636 }
637
638 //------------------------------------------------------------------------
639 // Get the underlying buffer
640 //------------------------------------------------------------------------
641 XrdCl::Buffer& GetBuffer()
642 {
643 return buffer;
644 }
645
646 private:
647 XrdCl::Buffer buffer;
648 XrdCl::ResponseHandler *handler;
649 };
650}
651
652namespace XrdCl
653{
654 //----------------------------------------------------------------------------
655 // Constructor
656 //----------------------------------------------------------------------------
658 pFileState( Closed ),
659 pStatInfo( 0 ),
660 pFileUrl( 0 ),
661 pDataServer( 0 ),
662 pLoadBalancer( 0 ),
663 pStateRedirect( 0 ),
664 pWrtRecoveryRedir( 0 ),
665 pFileHandle( 0 ),
666 pOpenMode( 0 ),
667 pOpenFlags( OpenFlags::None ),
668 pSessionId( 0 ),
669 pDoRecoverRead( true ),
670 pDoRecoverWrite( true ),
671 pFollowRedirects( true ),
672 pUseVirtRedirector( true ),
673 pIsChannelEncrypted( false ),
674 pAllowBundledClose( false ),
675 pPlugin( plugin )
676 {
677 pFileHandle = new uint8_t[4];
678 ResetMonitoringVars();
681 pLFileHandler = new LocalFileHandler();
682 }
683
684 //------------------------------------------------------------------------
689 //------------------------------------------------------------------------
690 FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
691 pFileState( Closed ),
692 pStatInfo( 0 ),
693 pFileUrl( 0 ),
694 pDataServer( 0 ),
695 pLoadBalancer( 0 ),
696 pStateRedirect( 0 ),
697 pWrtRecoveryRedir( 0 ),
698 pFileHandle( 0 ),
699 pOpenMode( 0 ),
700 pOpenFlags( OpenFlags::None ),
701 pSessionId( 0 ),
702 pDoRecoverRead( true ),
703 pDoRecoverWrite( true ),
704 pFollowRedirects( true ),
705 pUseVirtRedirector( useVirtRedirector ),
706 pAllowBundledClose( false ),
707 pPlugin( plugin )
708 {
709 pFileHandle = new uint8_t[4];
710 ResetMonitoringVars();
713 pLFileHandler = new LocalFileHandler();
714 }
715
716 //----------------------------------------------------------------------------
717 // Destructor
718 //----------------------------------------------------------------------------
720 {
721 //--------------------------------------------------------------------------
722 // This, in principle, should never ever happen. Except for the case
723 // when we're interfaced with ROOT that may call this desctructor from
724 // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
725 // has been finalized by the linker. So, if we don't have the log object
726 // at this point we just give up the hope.
727 //--------------------------------------------------------------------------
728 if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
729 DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
730
733
736
737 if( pFileState != Closed && DefaultEnv::GetLog() )
738 {
739 XRootDStatus st;
740 MonitorClose( &st );
741 ResetMonitoringVars();
742 }
743
744 // check if the logger is still there, this is only for root, as root might
745 // have unload us already so in this case we don't want to do anything
746 if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
747 {
749 registry.Release( *pFileUrl );
750 }
751
752 delete pStatInfo;
753 delete pFileUrl;
754 delete pDataServer;
755 delete pLoadBalancer;
756 delete [] pFileHandle;
757 delete pLFileHandler;
758 }
759
760 //----------------------------------------------------------------------------
761 // Open with file template
762 //----------------------------------------------------------------------------
764 std::shared_ptr<FileStateHandler> &self,
766 const std::string &url,
767 OpenFlags::Flags flags,
768 uint16_t mode,
769 ResponseHandler *handler,
770 time_t timeout )
771 {
772 if( !templ )
773 return XRootDStatus( stError, errInvalidArgs, 0, "Template file not available" );
774
775 FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>( templ );
776 if( !fht )
777 return XRootDStatus( stError, errInvalidArgs, 0, "Template file invalid" );
778
779 self->pTemplateFileWp = fht->pTemplateFileWp;
780
781 return OpenImpl( self, url, flags, mode, handler, timeout );
782 }
783
784 //----------------------------------------------------------------------------
785 // Open the file pointed to by the given URL
786 //----------------------------------------------------------------------------
787 XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
788 const std::string &url,
789 OpenFlags::Flags flags,
790 uint16_t mode,
791 ResponseHandler *handler,
792 time_t timeout )
793 {
794 self->pTemplateFileWp.reset();
795 return OpenImpl( self, url, flags, mode, handler, timeout );
796 }
797
798 //----------------------------------------------------------------------------
799 // Most of Open implementation, used by Open and OpenUsingTemplate
800 //----------------------------------------------------------------------------
801 XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
802 const std::string &url,
803 OpenFlags::Flags flags,
804 uint16_t mode,
805 ResponseHandler *handler,
806 time_t timeout )
807 {
808 XrdSysMutexHelper scopedLock( self->pMutex );
809
810 //--------------------------------------------------------------------------
811 // Check if we can proceed
812 //--------------------------------------------------------------------------
813 if( self->pFileState == Error )
814 return self->pStatus;
815
816 if( self->pFileState == OpenInProgress )
818
819 if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
820 self->pFileState == Recovering )
822
823 self->pFileState = OpenInProgress;
824
825 //--------------------------------------------------------------------------
826 // Check if the parameters are valid
827 //--------------------------------------------------------------------------
828 Log *log = DefaultEnv::GetLog();
829
830 if( self->pFileUrl )
831 {
832 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
833 {
835 registry.Release( *self->pFileUrl );
836 }
837 delete self->pFileUrl;
838 self->pFileUrl = 0;
839 }
840
841 self->pFileUrl = new URL( url );
842
843 //--------------------------------------------------------------------------
844 // Add unique uuid to each open request so replays due to error/timeout
845 // recovery can be correctly handled.
846 //--------------------------------------------------------------------------
847 URL::ParamsMap cgi = self->pFileUrl->GetParams();
848 uuid_t uuid;
849 char requuid[37]= {0};
850 uuid_generate( uuid );
851 uuid_unparse( uuid, requuid );
852 cgi["xrdcl.requuid"] = requuid;
853 self->pFileUrl->SetParams( cgi );
854
855 if( !self->pFileUrl->IsValid() )
856 {
857 log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
858 (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
859 self->pStatus = XRootDStatus( stError, errInvalidArgs );
860 self->pFileState = Closed;
861 return self->pStatus;
862 }
863
864 //--------------------------------------------------------------------------
865 // Check if the recovery procedures should be enabled
866 //--------------------------------------------------------------------------
867 const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
868 URL::ParamsMap::const_iterator it;
869 it = urlParams.find( "xrdcl.recover-reads" );
870 if( (it != urlParams.end() && it->second == "false") ||
871 !self->pDoRecoverRead )
872 {
873 self->pDoRecoverRead = false;
874 log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
875 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
876 }
877
878 it = urlParams.find( "xrdcl.recover-writes" );
879 if( (it != urlParams.end() && it->second == "false") ||
880 !self->pDoRecoverWrite )
881 {
882 self->pDoRecoverWrite = false;
883 log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
884 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
885 }
886
887 //--------------------------------------------------------------------------
888 // Open the file
889 //--------------------------------------------------------------------------
890 log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
891 self->pFileUrl->GetObfuscatedURL().c_str() );
892
893 self->pOpenMode = mode;
894 self->pOpenFlags = flags;
895 OpenHandler *openHandler = new OpenHandler( self, handler );
896
897 Message *msg;
898 ClientOpenRequest *req;
899 std::string path = self->pFileUrl->GetPathWithFilteredParams();
900 MessageUtils::CreateRequest( msg, req, path.length() );
901
902 req->requestid = kXR_open;
903 req->mode = mode;
904 req->options = (flags&0xffff) | kXR_async | kXR_retstat;
905 req->dlen = path.length();
906 URL sendUrl;
907 XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
908 if( !st.IsOK() )
909 {
910 delete openHandler;
911 self->pStatus = st;
912 self->pFileState = Closed;
913 return st;
914 }
915 msg->Append( path.c_str(), path.length(), 24 );
916
918 MessageSendParams params; params.timeout = timeout;
919 params.followRedirects = self->pFollowRedirects;
921
922 st = self->IssueRequest( sendUrl, msg, openHandler, params );
923
924 if( !st.IsOK() )
925 {
926 delete openHandler;
927 self->pStatus = st;
928 self->pFileState = Closed;
929 return st;
930 }
931 return st;
932 }
933
934 //----------------------------------------------------------------------------
935 // Close the file object
936 //----------------------------------------------------------------------------
937 XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
938 ResponseHandler *handler,
939 time_t timeout )
940 {
941 XrdSysMutexHelper scopedLock( self->pMutex );
942
943 //--------------------------------------------------------------------------
944 // Check if we can proceed
945 //--------------------------------------------------------------------------
946 if( self->pFileState == Error )
947 return self->pStatus;
948
949 if( self->pFileState == CloseInProgress )
951
952 if( self->pFileState == Closed )
954
955 if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
957
958 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
960
961 self->pFileState = CloseInProgress;
962
963 Log *log = DefaultEnv::GetLog();
964 log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
965 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
966 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
967
968 //--------------------------------------------------------------------------
969 // Close the file
970 //--------------------------------------------------------------------------
971 Message *msg;
973 MessageUtils::CreateRequest( msg, req );
974
975 req->requestid = kXR_close;
976 memcpy( req->fhandle, self->pFileHandle, 4 );
977
979 msg->SetSessionId( self->pSessionId );
980 CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
981 MessageSendParams params;
982 params.timeout = timeout;
983 params.followRedirects = false;
984 params.stateful = true;
986
987 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
988
989 if( !st.IsOK() )
990 {
991 // an invalid-session error means the connection to the server has been
992 // closed, which in turn means that the server closed the file already
995 st.code == errPollerError || st.code == errSocketError )
996 {
997 self->pFileState = Closed;
998 ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
999 nullptr, nullptr );
1001 return XRootDStatus();
1002 }
1003
1004 delete closeHandler;
1005 self->pStatus = st;
1006 self->pFileState = Error;
1007 return st;
1008 }
1009 return st;
1010 }
1011
1012 //----------------------------------------------------------------------------
1013 // Stat the file
1014 //----------------------------------------------------------------------------
1015 XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
1016 bool force,
1017 ResponseHandler *handler,
1018 time_t timeout )
1019 {
1020 XrdSysMutexHelper scopedLock( self->pMutex );
1021
1022 if( self->pFileState == Error ) return self->pStatus;
1023
1024 if( self->pFileState != Opened && self->pFileState != Recovering )
1026
1027 //--------------------------------------------------------------------------
1028 // Return the cached info
1029 //--------------------------------------------------------------------------
1030 if( !force )
1031 {
1032 AnyObject *obj = new AnyObject();
1033 obj->Set( new StatInfo( *self->pStatInfo ) );
1034 if (handler)
1035 handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
1036 return XRootDStatus();
1037 }
1038
1039 Log *log = DefaultEnv::GetLog();
1040 log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
1041 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1042 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1043
1044 //--------------------------------------------------------------------------
1045 // Issue a new stat request
1046 // stating a file handle doesn't work (fixed in 3.2.0) so we need to
1047 // stat the pat
1048 //--------------------------------------------------------------------------
1049 Message *msg;
1050 ClientStatRequest *req;
1051 std::string path = self->pFileUrl->GetPath();
1052 MessageUtils::CreateRequest( msg, req );
1053
1054 req->requestid = kXR_stat;
1055 memcpy( req->fhandle, self->pFileHandle, 4 );
1056
1057 MessageSendParams params;
1058 params.timeout = timeout;
1059 params.followRedirects = false;
1060 params.stateful = true;
1062
1064 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1065
1066 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1067 }
1068
1069 //----------------------------------------------------------------------------
1070 // Preread scattered data tracts in one operation - async
1071 //----------------------------------------------------------------------------
1072 XRootDStatus FileStateHandler::PreRead( std::shared_ptr<FileStateHandler> &self,
1073 const TractList &tracts,
1074 ResponseHandler *handler,
1075 time_t timeout )
1076 {
1077 //--------------------------------------------------------------------------
1078 // Sanity check
1079 //--------------------------------------------------------------------------
1080 XrdSysMutexHelper scopedLock( self->pMutex );
1081
1082 if( self->pFileState == Error ) return self->pStatus;
1083
1084 if( self->pFileState != Opened && self->pFileState != Recovering )
1086
1087 Log *log = DefaultEnv::GetLog();
1088 log->Debug( FileMsg, "[%p@%s] Sending an read+preread command for handle %#x to %s",
1089 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1090 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1091
1092 //--------------------------------------------------------------------------
1093 // Build the message
1094 //--------------------------------------------------------------------------
1095 Message *msg;
1096 ClientReadRequest *req;
1097 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*tracts.size() + 8 );
1098
1099 req->requestid = kXR_read;
1100 req->offset = 0;
1101 req->rlen = 0;
1102 memcpy( req->fhandle, self->pFileHandle, 4 );
1103 req->dlen = sizeof(readahead_list)*tracts.size() + 8;
1104
1105 static char dummyBuff[8];
1106 ChunkList *list = new ChunkList();
1107 list->push_back( ChunkInfo( 0, 0, dummyBuff ) );
1108
1109 //--------------------------------------------------------------------------
1110 // Copy the tract info
1111 //--------------------------------------------------------------------------
1112 readahead_list *dataTract = (readahead_list*)msg->GetBuffer( 24 + 8 );
1113 for( size_t i = 0; i < tracts.size(); ++i )
1114 {
1115 dataTract[i].rlen = tracts[i].length;
1116 dataTract[i].offset = tracts[i].offset;
1117 memcpy( dataTract[i].fhandle, req->fhandle, 4 );
1118 }
1119
1120 //--------------------------------------------------------------------------
1121 // Send the message
1122 //--------------------------------------------------------------------------
1123 MessageSendParams params;
1124 params.timeout = timeout;
1125 params.followRedirects = false;
1126 params.stateful = true;
1127 params.chunkList = list;
1129
1131 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1132
1133 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1134 }
1135
1136 //----------------------------------------------------------------------------
1137 // Read a data chunk at a given offset - sync
1138 //----------------------------------------------------------------------------
1139 XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1140 uint64_t offset,
1141 uint32_t size,
1142 void *buffer,
1143 ResponseHandler *handler,
1144 time_t timeout )
1145 {
1146 XrdSysMutexHelper scopedLock( self->pMutex );
1147
1148 if( self->pFileState == Error ) return self->pStatus;
1149
1150 if( self->pFileState != Opened && self->pFileState != Recovering )
1152
1153 Log *log = DefaultEnv::GetLog();
1154 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1155 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1156 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1157
1158 Message *msg;
1159 ClientReadRequest *req;
1160 MessageUtils::CreateRequest( msg, req );
1161
1162 req->requestid = kXR_read;
1163 req->offset = offset;
1164 req->rlen = size;
1165 memcpy( req->fhandle, self->pFileHandle, 4 );
1166
1167 ChunkList *list = new ChunkList();
1168 list->push_back( ChunkInfo( offset, size, buffer ) );
1169
1171 MessageSendParams params;
1172 params.timeout = timeout;
1173 params.followRedirects = false;
1174 params.stateful = true;
1175 params.chunkList = list;
1177 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1178
1179 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1180 }
1181
1182 //------------------------------------------------------------------------
1183 // Read data pages at a given offset
1184 //------------------------------------------------------------------------
1185 XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1186 uint64_t offset,
1187 uint32_t size,
1188 void *buffer,
1189 ResponseHandler *handler,
1190 time_t timeout )
1191 {
1192 int issupported = true;
1193 AnyObject obj;
1195 int protver = 0;
1196 XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1197 if( st1.IsOK() && st2.IsOK() )
1198 {
1199 int *ptr = 0;
1200 obj.Get( ptr );
1201 issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1202 delete ptr;
1203 }
1204 else
1205 issupported = false;
1206
1207 if( !issupported )
1208 {
1209 DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1210 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1211 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1212 auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1213 if( !st.IsOK() ) delete substitHandler;
1214 return st;
1215 }
1216
1217 ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1218 auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1219 if( !st.IsOK() ) delete pgHandler;
1220 return st;
1221 }
1222
1223 XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1224 uint64_t offset,
1225 uint32_t size,
1226 size_t pgnb,
1227 void *buffer,
1228 PgReadHandler *handler,
1229 time_t timeout )
1230 {
1231 if( size > (uint32_t)XrdSys::PageSize )
1232 return XRootDStatus( stError, errInvalidArgs, EINVAL,
1233 "PgRead retry size exceeded 4KB." );
1234
1235 ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1236 XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1237 if( !st.IsOK() ) delete retryHandler;
1238 return st;
1239 }
1240
1241 XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1242 uint64_t offset,
1243 uint32_t size,
1244 void *buffer,
1245 uint16_t flags,
1246 ResponseHandler *handler,
1247 time_t timeout )
1248 {
1249 XrdSysMutexHelper scopedLock( self->pMutex );
1250
1251 if( self->pFileState == Error ) return self->pStatus;
1252
1253 if( self->pFileState != Opened && self->pFileState != Recovering )
1255
1256 Log *log = DefaultEnv::GetLog();
1257 log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1258 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1259 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1260
1261 Message *msg;
1263 MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1264
1265 req->requestid = kXR_pgread;
1266 req->offset = offset;
1267 req->rlen = size;
1268 memcpy( req->fhandle, self->pFileHandle, 4 );
1269
1270 //--------------------------------------------------------------------------
1271 // Now adjust the message size so it can hold PgRead arguments
1272 //--------------------------------------------------------------------------
1273 req->dlen = sizeof( ClientPgReadReqArgs );
1274 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1275 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1276 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1277 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1278 args->reqflags = flags;
1279
1280 ChunkList *list = new ChunkList();
1281 list->push_back( ChunkInfo( offset, size, buffer ) );
1282
1284 MessageSendParams params;
1285 params.timeout = timeout;
1286 params.followRedirects = false;
1287 params.stateful = true;
1288 params.chunkList = list;
1290 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1291
1292 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1293 }
1294
1295 //----------------------------------------------------------------------------
1296 // Write a data chunk at a given offset - async
1297 //----------------------------------------------------------------------------
1298 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1299 uint64_t offset,
1300 uint32_t size,
1301 const void *buffer,
1302 ResponseHandler *handler,
1303 time_t timeout )
1304 {
1305 XrdSysMutexHelper scopedLock( self->pMutex );
1306
1307 if( self->pFileState == Error ) return self->pStatus;
1308
1309 if( self->pFileState != Opened && self->pFileState != Recovering )
1311
1312 Log *log = DefaultEnv::GetLog();
1313 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1314 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1315 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1316
1317 Message *msg;
1318 ClientWriteRequest *req;
1319 MessageUtils::CreateRequest( msg, req );
1320
1321 req->requestid = kXR_write;
1322 req->offset = offset;
1323 req->dlen = size;
1324 memcpy( req->fhandle, self->pFileHandle, 4 );
1325
1326 ChunkList *list = new ChunkList();
1327 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1328
1329 MessageSendParams params;
1330 params.timeout = timeout;
1331 params.followRedirects = false;
1332 params.stateful = true;
1333 params.chunkList = list;
1334
1336
1338 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1339
1340 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1341 }
1342
1343 //----------------------------------------------------------------------------
1344 // Write a data chunk at a given offset
1345 //----------------------------------------------------------------------------
1346 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1347 uint64_t offset,
1348 Buffer &&buffer,
1349 ResponseHandler *handler,
1350 time_t timeout )
1351 {
1352 //--------------------------------------------------------------------------
1353 // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1354 // so fall back to normal write
1355 //--------------------------------------------------------------------------
1356 if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1357 {
1358 Log *log = DefaultEnv::GetLog();
1359 log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1360 "cannot convert it to kernel space buffer.", (void*)self.get(),
1361 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1362
1363 void *buff = buffer.GetBuffer();
1364 uint32_t size = buffer.GetSize();
1365 ReleaseBufferHandler *wrtHandler =
1366 new ReleaseBufferHandler( std::move( buffer ), handler );
1367 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1368 if( !st.IsOK() )
1369 {
1370 buffer = std::move( wrtHandler->GetBuffer() );
1371 delete wrtHandler;
1372 }
1373 return st;
1374 }
1375
1376 //--------------------------------------------------------------------------
1377 // Transfer the data from user space to kernel space
1378 //--------------------------------------------------------------------------
1379 uint32_t length = buffer.GetSize();
1380 char *ubuff = buffer.Release();
1381
1382 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1383 ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1384 if( ret < 0 )
1386
1387 //--------------------------------------------------------------------------
1388 // Now create a write request and enqueue it
1389 //--------------------------------------------------------------------------
1390 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1391 }
1392
1393 //----------------------------------------------------------------------------
1394 // Write a data from a given file descriptor at a given offset - async
1395 //----------------------------------------------------------------------------
1396 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1397 uint64_t offset,
1398 uint32_t size,
1399 Optional<uint64_t> fdoff,
1400 int fd,
1401 ResponseHandler *handler,
1402 time_t timeout )
1403 {
1404 //--------------------------------------------------------------------------
1405 // Read the data from the file descriptor into a kernel buffer
1406 //--------------------------------------------------------------------------
1407 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1408 ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1409 XrdSys::Read( fd, *kbuff, size );
1410 if( ret < 0 )
1412
1413 //--------------------------------------------------------------------------
1414 // Now create a write request and enqueue it
1415 //--------------------------------------------------------------------------
1416 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1417 }
1418
1419 //----------------------------------------------------------------------------
1420 // Write number of pages at a given offset - async
1421 //----------------------------------------------------------------------------
1422 XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1423 uint64_t offset,
1424 uint32_t size,
1425 const void *buffer,
1426 std::vector<uint32_t> &cksums,
1427 ResponseHandler *handler,
1428 time_t timeout )
1429 {
1430 //--------------------------------------------------------------------------
1431 // Resolve timeout value
1432 //--------------------------------------------------------------------------
1433 if( timeout == 0 )
1434 {
1435 int val = DefaultRequestTimeout;
1436 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1437 timeout = val;
1438 }
1439
1440 //--------------------------------------------------------------------------
1441 // Validate the digest vector size
1442 //--------------------------------------------------------------------------
1443 if( cksums.empty() )
1444 {
1445 const char *data = static_cast<const char*>( buffer );
1446 XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1447 }
1448 else
1449 {
1450 size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1451 if( crc32cCnt != cksums.size() )
1452 return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1453 }
1454
1455 //--------------------------------------------------------------------------
1456 // Create a context for PgWrite operation
1457 //--------------------------------------------------------------------------
1458 struct pgwrt_t
1459 {
1460 pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1461 {
1462 }
1463
1464 ~pgwrt_t()
1465 {
1466 if( handler )
1467 {
1468 // if all retries were successful no error status was set
1469 if( !status ) status = new XRootDStatus();
1470 handler->HandleResponse( status, nullptr );
1471 }
1472 }
1473
1474 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1475 {
1476 if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1477 return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1478 }
1479
1480 inline void SetStatus( XRootDStatus* s )
1481 {
1482 if( !status ) status = s;
1483 else delete s;
1484 }
1485
1486 ResponseHandler *handler;
1487 XRootDStatus *status;
1488 };
1489 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1490
1491 int fLen, lLen;
1492 XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1493 uint32_t fstpglen = fLen;
1494
1495 time_t start = ::time( nullptr );
1496 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1497 {
1498 std::unique_ptr<AnyObject> scoped( r );
1499 // if the request failed simply pass the status to the
1500 // user handler
1501 if( !s->IsOK() )
1502 {
1503 pgwrt->SetStatus( s );
1504 return; // pgwrt destructor will call the handler
1505 }
1506 // also if the request was sucessful and there were no
1507 // corrupted pages pass the status to the user handler
1508 RetryInfo *inf = nullptr;
1509 r->Get( inf );
1510 if( !inf->NeedRetry() )
1511 {
1512 pgwrt->SetStatus( s );
1513 return; // pgwrt destructor will call the handler
1514 }
1515 delete s;
1516 // first adjust the timeout value
1517 time_t elapsed = ::time( nullptr ) - start;
1518 if( elapsed >= timeout )
1519 {
1520 pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1521 return; // pgwrt destructor will call the handler
1522 }
1523 else timeout -= elapsed;
1524 // retransmit the corrupted pages
1525 for( size_t i = 0; i < inf->Size(); ++i )
1526 {
1527 auto tpl = inf->At( i );
1528 uint64_t pgoff = std::get<0>( tpl );
1529 uint32_t pglen = std::get<1>( tpl );
1530 const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1531 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1532 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1533 {
1534 std::unique_ptr<AnyObject> scoped( r );
1535 // if we failed simply set the status
1536 if( !s->IsOK() )
1537 {
1538 pgwrt->SetStatus( s );
1539 return; // the destructor will call the handler
1540 }
1541 delete s;
1542 // otherwise check if the data were not corrupted again
1543 RetryInfo *inf = nullptr;
1544 r->Get( inf );
1545 if( inf->NeedRetry() ) // so we failed in the end
1546 {
1547 DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1548 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1549 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1550 pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1551 "Failed to retransmit corrupted page" ) );
1552 }
1553 else
1554 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1555 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1556 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1557 } );
1558 auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1559 if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1560 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1561 "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1562 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1563 }
1564 } );
1565
1566 auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1567 if( !st.IsOK() )
1568 {
1569 pgwrt->handler = nullptr;
1570 delete h;
1571 }
1572 return st;
1573 }
1574
1575 //------------------------------------------------------------------------
1576 // Write number of pages at a given offset - async
1577 //------------------------------------------------------------------------
1578 XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1579 uint64_t offset,
1580 uint32_t size,
1581 const void *buffer,
1582 uint32_t digest,
1583 ResponseHandler *handler,
1584 time_t timeout )
1585 {
1586 std::vector<uint32_t> cksums{ digest };
1587 return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1588 }
1589
1590 //------------------------------------------------------------------------
1591 // Write number of pages at a given offset - async
1592 //------------------------------------------------------------------------
1593 XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1594 uint64_t offset,
1595 uint32_t size,
1596 const void *buffer,
1597 std::vector<uint32_t> &cksums,
1598 kXR_char flags,
1599 ResponseHandler *handler,
1600 time_t timeout )
1601 {
1602 XrdSysMutexHelper scopedLock( self->pMutex );
1603
1604 if( self->pFileState == Error ) return self->pStatus;
1605
1606 if( self->pFileState != Opened && self->pFileState != Recovering )
1608
1609 Log *log = DefaultEnv::GetLog();
1610 log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1611 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1612 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1613
1614 //--------------------------------------------------------------------------
1615 // Create the message
1616 //--------------------------------------------------------------------------
1617 Message *msg;
1619 MessageUtils::CreateRequest( msg, req );
1620
1621 req->requestid = kXR_pgwrite;
1622 req->offset = offset;
1623 req->dlen = size + cksums.size() * sizeof( uint32_t );
1624 req->reqflags = flags;
1625 memcpy( req->fhandle, self->pFileHandle, 4 );
1626
1627 ChunkList *list = new ChunkList();
1628 list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1629
1630 MessageSendParams params;
1631 params.timeout = timeout;
1632 params.followRedirects = false;
1633 params.stateful = true;
1634 params.chunkList = list;
1635 params.crc32cDigests.swap( cksums );
1636
1638
1640 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1641
1642 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1643 }
1644
1645 //----------------------------------------------------------------------------
1646 // Commit all pending disk writes - async
1647 //----------------------------------------------------------------------------
1648 XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1649 ResponseHandler *handler,
1650 time_t timeout )
1651 {
1652 XrdSysMutexHelper scopedLock( self->pMutex );
1653
1654 if( self->pFileState == Error ) return self->pStatus;
1655
1656 if( self->pFileState != Opened && self->pFileState != Recovering )
1658
1659 Log *log = DefaultEnv::GetLog();
1660 log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1661 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1662 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1663
1664 Message *msg;
1665 ClientSyncRequest *req;
1666 MessageUtils::CreateRequest( msg, req );
1667
1668 req->requestid = kXR_sync;
1669 memcpy( req->fhandle, self->pFileHandle, 4 );
1670
1671 MessageSendParams params;
1672 params.timeout = timeout;
1673 params.followRedirects = false;
1674 params.stateful = true;
1676
1678 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1679
1680 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1681 }
1682
1683 //----------------------------------------------------------------------------
1684 // Truncate the file to a particular size - async
1685 //----------------------------------------------------------------------------
1686 XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1687 uint64_t size,
1688 ResponseHandler *handler,
1689 time_t timeout )
1690 {
1691 XrdSysMutexHelper scopedLock( self->pMutex );
1692
1693 if( self->pFileState == Error ) return self->pStatus;
1694
1695 if( self->pFileState != Opened && self->pFileState != Recovering )
1697
1698 Log *log = DefaultEnv::GetLog();
1699 log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1700 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1701 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1702
1703 Message *msg;
1705 MessageUtils::CreateRequest( msg, req );
1706
1707 req->requestid = kXR_truncate;
1708 memcpy( req->fhandle, self->pFileHandle, 4 );
1709 req->offset = size;
1710
1711 MessageSendParams params;
1712 params.timeout = timeout;
1713 params.followRedirects = false;
1714 params.stateful = true;
1716
1718 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1719
1720 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1721 }
1722
1723 //----------------------------------------------------------------------------
1724 // Read scattered data chunks in one operation - async
1725 //----------------------------------------------------------------------------
1726 XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1727 const ChunkList &chunks,
1728 void *buffer,
1729 ResponseHandler *handler,
1730 time_t timeout )
1731 {
1732 //--------------------------------------------------------------------------
1733 // Sanity check
1734 //--------------------------------------------------------------------------
1735 XrdSysMutexHelper scopedLock( self->pMutex );
1736
1737 if( self->pFileState == Error ) return self->pStatus;
1738
1739 if( self->pFileState != Opened && self->pFileState != Recovering )
1741
1742 Log *log = DefaultEnv::GetLog();
1743 log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1744 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1745 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1746
1747 //--------------------------------------------------------------------------
1748 // Build the message
1749 //--------------------------------------------------------------------------
1750 Message *msg;
1751 ClientReadVRequest *req;
1752 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1753
1754 req->requestid = kXR_readv;
1755 req->dlen = sizeof(readahead_list)*chunks.size();
1756
1757 ChunkList *list = new ChunkList();
1758 char *cursor = (char*)buffer;
1759
1760 //--------------------------------------------------------------------------
1761 // Copy the chunk info
1762 //--------------------------------------------------------------------------
1763 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1764 for( size_t i = 0; i < chunks.size(); ++i )
1765 {
1766 dataChunk[i].rlen = chunks[i].length;
1767 dataChunk[i].offset = chunks[i].offset;
1768 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1769
1770 void *chunkBuffer;
1771 if( cursor )
1772 {
1773 chunkBuffer = cursor;
1774 cursor += chunks[i].length;
1775 }
1776 else
1777 chunkBuffer = chunks[i].buffer;
1778
1779 list->push_back( ChunkInfo( chunks[i].offset,
1780 chunks[i].length,
1781 chunkBuffer ) );
1782 }
1783
1784 //--------------------------------------------------------------------------
1785 // Send the message
1786 //--------------------------------------------------------------------------
1787 MessageSendParams params;
1788 params.timeout = timeout;
1789 params.followRedirects = false;
1790 params.stateful = true;
1791 params.chunkList = list;
1793
1795 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1796
1797 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1798 }
1799
1800 //------------------------------------------------------------------------
1801 // Write scattered data chunks in one operation - async
1802 //------------------------------------------------------------------------
1803 XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1804 const ChunkList &chunks,
1805 ResponseHandler *handler,
1806 time_t timeout )
1807 {
1808 //--------------------------------------------------------------------------
1809 // Sanity check
1810 //--------------------------------------------------------------------------
1811 XrdSysMutexHelper scopedLock( self->pMutex );
1812
1813 if( self->pFileState == Error ) return self->pStatus;
1814
1815 if( self->pFileState != Opened && self->pFileState != Recovering )
1817
1818 Log *log = DefaultEnv::GetLog();
1819 log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1820 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1821 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1822
1823 //--------------------------------------------------------------------------
1824 // Determine the size of the payload
1825 //--------------------------------------------------------------------------
1826
1827 // the size of write vector
1828 uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1829
1830 //--------------------------------------------------------------------------
1831 // Build the message
1832 //--------------------------------------------------------------------------
1833 Message *msg;
1835 MessageUtils::CreateRequest( msg, req, payloadSize );
1836
1837 req->requestid = kXR_writev;
1838 req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1839
1840 ChunkList *list = new ChunkList();
1841
1842 //--------------------------------------------------------------------------
1843 // Copy the chunk info
1844 //--------------------------------------------------------------------------
1845 XrdProto::write_list *writeList =
1846 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1847
1848
1849
1850 for( size_t i = 0; i < chunks.size(); ++i )
1851 {
1852 writeList[i].wlen = chunks[i].length;
1853 writeList[i].offset = chunks[i].offset;
1854 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1855
1856 list->push_back( ChunkInfo( chunks[i].offset,
1857 chunks[i].length,
1858 chunks[i].buffer ) );
1859 }
1860
1861 //--------------------------------------------------------------------------
1862 // Send the message
1863 //--------------------------------------------------------------------------
1864 MessageSendParams params;
1865 params.timeout = timeout;
1866 params.followRedirects = false;
1867 params.stateful = true;
1868 params.chunkList = list;
1870
1872 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1873
1874 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1875 }
1876
1877 //------------------------------------------------------------------------
1878 // Write scattered buffers in one operation - async
1879 //------------------------------------------------------------------------
1880 XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1881 uint64_t offset,
1882 const struct iovec *iov,
1883 int iovcnt,
1884 ResponseHandler *handler,
1885 time_t timeout )
1886 {
1887 XrdSysMutexHelper scopedLock( self->pMutex );
1888
1889 if( self->pFileState == Error ) return self->pStatus;
1890
1891 if( self->pFileState != Opened && self->pFileState != Recovering )
1893
1894 Log *log = DefaultEnv::GetLog();
1895 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1896 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1897 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1898
1899 Message *msg;
1900 ClientWriteRequest *req;
1901 MessageUtils::CreateRequest( msg, req );
1902
1903 ChunkList *list = new ChunkList();
1904
1905 uint32_t size = 0;
1906 for( int i = 0; i < iovcnt; ++i )
1907 {
1908 if( iov[i].iov_len == 0 ) continue;
1909 size += iov[i].iov_len;
1910 list->push_back( ChunkInfo( 0, iov[i].iov_len,
1911 (char*)iov[i].iov_base ) );
1912 }
1913
1914 req->requestid = kXR_write;
1915 req->offset = offset;
1916 req->dlen = size;
1917 memcpy( req->fhandle, self->pFileHandle, 4 );
1918
1919 MessageSendParams params;
1920 params.timeout = timeout;
1921 params.followRedirects = false;
1922 params.stateful = true;
1923 params.chunkList = list;
1924
1926
1928 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1929
1930 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1931 }
1932
1933 //------------------------------------------------------------------------
1934 // Read data into scattered buffers in one operation - async
1935 //------------------------------------------------------------------------
1936 XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1937 uint64_t offset,
1938 struct iovec *iov,
1939 int iovcnt,
1940 ResponseHandler *handler,
1941 time_t timeout )
1942 {
1943 XrdSysMutexHelper scopedLock( self->pMutex );
1944
1945 if( self->pFileState == Error ) return self->pStatus;
1946
1947 if( self->pFileState != Opened && self->pFileState != Recovering )
1949
1950 Log *log = DefaultEnv::GetLog();
1951 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1952 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1953 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1954
1955 Message *msg;
1956 ClientReadRequest *req;
1957 MessageUtils::CreateRequest( msg, req );
1958
1959 // calculate the total read size
1960 size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1961 {
1962 return acc + rhs.iov_len;
1963 } );
1964 req->requestid = kXR_read;
1965 req->offset = offset;
1966 req->rlen = size;
1968 memcpy( req->fhandle, self->pFileHandle, 4 );
1969
1970 ChunkList *list = new ChunkList();
1971 list->reserve( iovcnt );
1972 uint64_t choff = offset;
1973 for( int i = 0; i < iovcnt; ++i )
1974 {
1975 list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1976 choff += iov[i].iov_len;
1977 }
1978
1980 MessageSendParams params;
1981 params.timeout = timeout;
1982 params.followRedirects = false;
1983 params.stateful = true;
1984 params.chunkList = list;
1986 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1987
1988 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1989 }
1990
1991
1992 //----------------------------------------------------------------------------
1993 // Performs a custom operation on an open file, server implementation
1994 // dependent - async
1995 //----------------------------------------------------------------------------
1996 XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1997 QueryCode::Code queryCode,
1998 const Buffer &arg,
1999 ResponseHandler *handler,
2000 time_t timeout )
2001 {
2002 XrdSysMutexHelper scopedLock( self->pMutex );
2003
2004 if( self->pFileState == Error ) return self->pStatus;
2005
2006 if( self->pFileState != Opened && self->pFileState != Recovering )
2008
2009 Log *log = DefaultEnv::GetLog();
2010 log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
2011 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2012 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2013
2014 Message *msg;
2015 ClientQueryRequest *req;
2016 MessageUtils::CreateRequest( msg, req, arg.GetSize() );
2017
2018 req->requestid = kXR_query;
2019 req->infotype = queryCode;
2020 req->dlen = arg.GetSize();
2021 memcpy( req->fhandle, self->pFileHandle, 4 );
2022 msg->Append( arg.GetBuffer(), arg.GetSize(), sizeof(ClientQueryRequest) );
2023
2024 MessageSendParams params;
2025 params.timeout = timeout;
2026 params.followRedirects = false;
2027 params.stateful = true;
2029
2031 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2032
2033 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2034 }
2035
2036 //----------------------------------------------------------------------------
2037 // Get access token to a file - async
2038 //----------------------------------------------------------------------------
2039 XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
2040 ResponseHandler *handler,
2041 time_t timeout )
2042 {
2043 XrdSysMutexHelper scopedLock( self->pMutex );
2044
2045 if( self->pFileState == Error ) return self->pStatus;
2046
2047 if( self->pFileState != Opened && self->pFileState != Recovering )
2049
2050 Log *log = DefaultEnv::GetLog();
2051 log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
2052 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2053 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2054
2055 Message *msg;
2056 ClientQueryRequest *req;
2057 MessageUtils::CreateRequest( msg, req );
2058
2059 req->requestid = kXR_query;
2060 req->infotype = kXR_Qvisa;
2061 memcpy( req->fhandle, self->pFileHandle, 4 );
2062
2063 MessageSendParams params;
2064 params.timeout = timeout;
2065 params.followRedirects = false;
2066 params.stateful = true;
2068
2070 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2071
2072 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2073 }
2074
2075 //------------------------------------------------------------------------
2076 // Set extended attributes - async
2077 //------------------------------------------------------------------------
2078 XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
2079 const std::vector<xattr_t> &attrs,
2080 ResponseHandler *handler,
2081 time_t timeout )
2082 {
2083 XrdSysMutexHelper scopedLock( self->pMutex );
2084
2085 if( self->pFileState == Error ) return self->pStatus;
2086
2087 if( self->pFileState != Opened && self->pFileState != Recovering )
2089
2090 Log *log = DefaultEnv::GetLog();
2091 log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
2092 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2093 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2094
2095 //--------------------------------------------------------------------------
2096 // Issue a new fattr get request
2097 //--------------------------------------------------------------------------
2098 return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
2099 }
2100
2101 //------------------------------------------------------------------------
2102 // Get extended attributes - async
2103 //------------------------------------------------------------------------
2104 XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
2105 const std::vector<std::string> &attrs,
2106 ResponseHandler *handler,
2107 time_t timeout )
2108 {
2109 XrdSysMutexHelper scopedLock( self->pMutex );
2110
2111 if( self->pFileState == Error ) return self->pStatus;
2112
2113 if( self->pFileState != Opened && self->pFileState != Recovering )
2115
2116 Log *log = DefaultEnv::GetLog();
2117 log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
2118 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2119 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2120
2121 //--------------------------------------------------------------------------
2122 // Issue a new fattr get request
2123 //--------------------------------------------------------------------------
2124 return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2125 }
2126
2127 //------------------------------------------------------------------------
2128 // Delete extended attributes - async
2129 //------------------------------------------------------------------------
2130 XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2131 const std::vector<std::string> &attrs,
2132 ResponseHandler *handler,
2133 time_t timeout )
2134 {
2135 XrdSysMutexHelper scopedLock( self->pMutex );
2136
2137 if( self->pFileState == Error ) return self->pStatus;
2138
2139 if( self->pFileState != Opened && self->pFileState != Recovering )
2141
2142 Log *log = DefaultEnv::GetLog();
2143 log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2144 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2145 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2146
2147 //--------------------------------------------------------------------------
2148 // Issue a new fattr del request
2149 //--------------------------------------------------------------------------
2150 return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2151 }
2152
2153 //------------------------------------------------------------------------
2154 // List extended attributes - async
2155 //------------------------------------------------------------------------
2156 XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2157 ResponseHandler *handler,
2158 time_t timeout )
2159 {
2160 XrdSysMutexHelper scopedLock( self->pMutex );
2161
2162 if( self->pFileState == Error ) return self->pStatus;
2163
2164 if( self->pFileState != Opened && self->pFileState != Recovering )
2166
2167 Log *log = DefaultEnv::GetLog();
2168 log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2169 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2170 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2171
2172 //--------------------------------------------------------------------------
2173 // Issue a new fattr get request
2174 //--------------------------------------------------------------------------
2175 static const std::vector<std::string> nothing;
2176 return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2177 nothing, handler, timeout );
2178 }
2179
2180 //------------------------------------------------------------------------
2190 //------------------------------------------------------------------------
2191 XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2192 kXR_char code,
2193 ResponseHandler *handler,
2194 time_t timeout )
2195 {
2196 XrdSysMutexHelper scopedLock( self->pMutex );
2197
2198 if( self->pFileState == Error ) return self->pStatus;
2199
2200 if( self->pFileState != Opened && self->pFileState != Recovering )
2202
2203 Log *log = DefaultEnv::GetLog();
2204 log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2205 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2206 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2207
2208 Message *msg;
2210 MessageUtils::CreateRequest( msg, req );
2211
2212 req->requestid = kXR_chkpoint;
2213 req->opcode = code;
2214 memcpy( req->fhandle, self->pFileHandle, 4 );
2215
2216 MessageSendParams params;
2217 params.timeout = timeout;
2218 params.followRedirects = false;
2219 params.stateful = true;
2220
2222
2224 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2225
2226 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2227 }
2228
2229 //------------------------------------------------------------------------
2239 //------------------------------------------------------------------------
2240 XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2241 uint64_t offset,
2242 uint32_t size,
2243 const void *buffer,
2244 ResponseHandler *handler,
2245 time_t timeout )
2246 {
2247 XrdSysMutexHelper scopedLock( self->pMutex );
2248
2249 if( self->pFileState == Error ) return self->pStatus;
2250
2251 if( self->pFileState != Opened && self->pFileState != Recovering )
2253
2254 Log *log = DefaultEnv::GetLog();
2255 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2256 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2257 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2258
2259 Message *msg;
2261 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2262
2263 req->requestid = kXR_chkpoint;
2264 req->opcode = kXR_ckpXeq;
2265 req->dlen = 24; // as specified in the protocol specification
2266 memcpy( req->fhandle, self->pFileHandle, 4 );
2267
2269 wrtreq->requestid = kXR_write;
2270 wrtreq->offset = offset;
2271 wrtreq->dlen = size;
2272 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2273
2274 ChunkList *list = new ChunkList();
2275 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2276
2277 MessageSendParams params;
2278 params.timeout = timeout;
2279 params.followRedirects = false;
2280 params.stateful = true;
2281 params.chunkList = list;
2282
2284
2286 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2287
2288 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2289 }
2290
2291 //------------------------------------------------------------------------
2301 //------------------------------------------------------------------------
2302 XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2303 uint64_t offset,
2304 const struct iovec *iov,
2305 int iovcnt,
2306 ResponseHandler *handler,
2307 time_t timeout )
2308 {
2309 XrdSysMutexHelper scopedLock( self->pMutex );
2310
2311 if( self->pFileState == Error ) return self->pStatus;
2312
2313 if( self->pFileState != Opened && self->pFileState != Recovering )
2315
2316 Log *log = DefaultEnv::GetLog();
2317 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2318 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2319 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2320
2321 Message *msg;
2323 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2324
2325 req->requestid = kXR_chkpoint;
2326 req->opcode = kXR_ckpXeq;
2327 req->dlen = 24; // as specified in the protocol specification
2328 memcpy( req->fhandle, self->pFileHandle, 4 );
2329
2330 ChunkList *list = new ChunkList();
2331 uint32_t size = 0;
2332 for( int i = 0; i < iovcnt; ++i )
2333 {
2334 if( iov[i].iov_len == 0 ) continue;
2335 size += iov[i].iov_len;
2336 list->push_back( ChunkInfo( 0, iov[i].iov_len,
2337 (char*)iov[i].iov_base ) );
2338 }
2339
2341 wrtreq->requestid = kXR_write;
2342 wrtreq->offset = offset;
2343 wrtreq->dlen = size;
2344 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2345
2346 MessageSendParams params;
2347 params.timeout = timeout;
2348 params.followRedirects = false;
2349 params.stateful = true;
2350 params.chunkList = list;
2351
2353
2355 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2356
2357 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2358 }
2359
2360 //----------------------------------------------------------------------------
2361 // Check if the file is open
2362 //----------------------------------------------------------------------------
2364 {
2365 XrdSysMutexHelper scopedLock( pMutex );
2366
2367 if( pFileState == Opened || pFileState == Recovering )
2368 return true;
2369 return false;
2370 }
2371
2372 //----------------------------------------------------------------------------
2373 // Set file property
2374 //----------------------------------------------------------------------------
2375 bool FileStateHandler::SetProperty( const std::string &name,
2376 const std::string &value )
2377 {
2378 XrdSysMutexHelper scopedLock( pMutex );
2379 if( name == "ReadRecovery" )
2380 {
2381 if( value == "true" ) pDoRecoverRead = true;
2382 else pDoRecoverRead = false;
2383 return true;
2384 }
2385 else if( name == "WriteRecovery" )
2386 {
2387 if( value == "true" ) pDoRecoverWrite = true;
2388 else pDoRecoverWrite = false;
2389 return true;
2390 }
2391 else if( name == "FollowRedirects" )
2392 {
2393 if( value == "true" ) pFollowRedirects = true;
2394 else pFollowRedirects = false;
2395 return true;
2396 }
2397 else if( name == "BundledClose" )
2398 {
2399 if( value == "true" ) pAllowBundledClose = true;
2400 else pAllowBundledClose = false;
2401 return true;
2402 }
2403 return false;
2404 }
2405
2406 //----------------------------------------------------------------------------
2407 // Get file property
2408 //----------------------------------------------------------------------------
2409 bool FileStateHandler::GetProperty( const std::string &name,
2410 std::string &value ) const
2411 {
2412 XrdSysMutexHelper scopedLock( pMutex );
2413 if( name == "ReadRecovery" )
2414 {
2415 if( pDoRecoverRead ) value = "true";
2416 else value = "false";
2417 return true;
2418 }
2419 else if( name == "WriteRecovery" )
2420 {
2421 if( pDoRecoverWrite ) value = "true";
2422 else value = "false";
2423 return true;
2424 }
2425 else if( name == "FollowRedirects" )
2426 {
2427 if( pFollowRedirects ) value = "true";
2428 else value = "false";
2429 return true;
2430 }
2431 else if( name == "DataServer" && pDataServer )
2432 { value = pDataServer->GetHostId(); return true; }
2433 else if( name == "LastURL" && pDataServer )
2434 { value = pDataServer->GetURL(); return true; }
2435 else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2436 { value = pWrtRecoveryRedir->GetHostId(); return true; }
2437 value = "";
2438 return false;
2439 }
2440
2441 //----------------------------------------------------------------------------
2442 // Process the results of the opening operation
2443 //----------------------------------------------------------------------------
2445 const OpenInfo *openInfo,
2446 const HostList *hostList )
2447 {
2448 Log *log = DefaultEnv::GetLog();
2449 XrdSysMutexHelper scopedLock( pMutex );
2450
2451 //--------------------------------------------------------------------------
2452 // Assign the data server and the load balancer
2453 //--------------------------------------------------------------------------
2454 std::string lastServer = pFileUrl->GetHostId();
2455 if( hostList )
2456 {
2457 delete pDataServer;
2458 delete pLoadBalancer;
2459 pLoadBalancer = 0;
2460 delete pWrtRecoveryRedir;
2461 pWrtRecoveryRedir = 0;
2462
2463 pDataServer = new URL( hostList->back().url );
2464 pDataServer->SetParams( pFileUrl->GetParams() );
2465 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2466 lastServer = pDataServer->GetHostId();
2467 HostList::const_iterator itC;
2468 URL::ParamsMap params = pDataServer->GetParams();
2469 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2470 {
2471 MessageUtils::MergeCGI( params,
2472 itC->url.GetParams(),
2473 true );
2474 }
2475 pDataServer->SetParams( params );
2476
2477 HostList::const_reverse_iterator it;
2478 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2479 if( it->loadBalancer )
2480 {
2481 pLoadBalancer = new URL( it->url );
2482 break;
2483 }
2484
2485 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2486 if( it->flags & kXR_recoverWrts )
2487 {
2488 pWrtRecoveryRedir = new URL( it->url );
2489 break;
2490 }
2491 }
2492
2493 log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2494 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2495
2496 if( pDataServer && !pDataServer->IsLocalFile() )
2497 {
2498 //------------------------------------------------------------------------
2499 // Check if we are using a secure connection
2500 //------------------------------------------------------------------------
2501 XrdCl::AnyObject isencobj;
2503 QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2504 if( st.IsOK() )
2505 {
2506 bool *isenc;
2507 isencobj.Get( isenc );
2508 pIsChannelEncrypted = isenc ? *isenc : false;
2509 delete isenc;
2510 }
2511 }
2512
2513 //--------------------------------------------------------------------------
2514 // We have failed
2515 //--------------------------------------------------------------------------
2516 pStatus = *status;
2517 if( !pStatus.IsOK() || !openInfo )
2518 {
2519 log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2520 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2521 pStatus.ToStr().c_str() );
2522 FailQueuedMessages( pStatus );
2523 pFileState = Error;
2524
2525 //------------------------------------------------------------------------
2526 // Report to monitoring
2527 //------------------------------------------------------------------------
2529 if( mon )
2530 {
2532 i.file = pFileUrl;
2533 i.status = status;
2535 mon->Event( Monitor::EvErrIO, &i );
2536 }
2537 }
2538 //--------------------------------------------------------------------------
2539 // We have succeeded
2540 //--------------------------------------------------------------------------
2541 else
2542 {
2543 //------------------------------------------------------------------------
2544 // if requested file colocation or dup was done, don't do again on reopen
2545 //------------------------------------------------------------------------
2546 pOpenFlags &= ~(OpenFlags::Dup | OpenFlags::Samefs);
2547
2548 //------------------------------------------------------------------------
2549 // Store the response info
2550 //------------------------------------------------------------------------
2551 openInfo->GetFileHandle( pFileHandle );
2552 pSessionId = openInfo->GetSessionId();
2553 if( openInfo->GetStatInfo() )
2554 {
2555 delete pStatInfo;
2556 pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2557 }
2558
2559 log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2560 "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2561 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2562 (unsigned long long) pSessionId );
2563
2564 //------------------------------------------------------------------------
2565 // Inform the monitoring about opening success
2566 //------------------------------------------------------------------------
2567 gettimeofday( &pOpenTime, 0 );
2569 if( mon )
2570 {
2572 i.file = pFileUrl;
2573 i.dataServer = pDataServer->GetHostId();
2574 i.oFlags = pOpenFlags;
2575 i.oFlags2 = pOpenFlags>>16;
2576 i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2577 mon->Event( Monitor::EvOpen, &i );
2578 }
2579
2580 //------------------------------------------------------------------------
2581 // Resend the queued messages if any
2582 //------------------------------------------------------------------------
2583 ReSendQueuedMessages();
2584 pFileState = Opened;
2585 }
2586 }
2587
2588 //----------------------------------------------------------------------------
2589 // Process the results of the closing operation
2590 //----------------------------------------------------------------------------
2592 {
2593 Log *log = DefaultEnv::GetLog();
2594 XrdSysMutexHelper scopedLock( pMutex );
2595
2596 log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2597 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2598 status->ToStr().c_str() );
2599
2600 log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2601 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2602
2603 MonitorClose( status );
2604 ResetMonitoringVars();
2605
2606 pStatus = *status;
2607 pFileState = Closed;
2608 }
2609
2610 //----------------------------------------------------------------------------
2611 // Handle an error while sending a stateful message
2612 //----------------------------------------------------------------------------
2613 void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2614 XRootDStatus *status,
2615 Message *message,
2616 ResponseHandler *userHandler,
2617 MessageSendParams &sendParams )
2618 {
2619 //--------------------------------------------------------------------------
2620 // It may be a redirection
2621 //--------------------------------------------------------------------------
2622 if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2623 {
2624 static const std::string root = "root", xroot = "xroot", file = "file",
2625 roots = "roots", xroots = "xroots";
2626 std::string msg = status->GetErrorMessage();
2627 if( !msg.compare( 0, root.size(), root ) ||
2628 !msg.compare( 0, xroot.size(), xroot ) ||
2629 !msg.compare( 0, file.size(), file ) ||
2630 !msg.compare( 0, roots.size(), roots ) ||
2631 !msg.compare( 0, xroots.size(), xroots ) )
2632 {
2633 FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2634 return;
2635 }
2636 }
2637
2638 //--------------------------------------------------------------------------
2639 // Handle error
2640 //--------------------------------------------------------------------------
2641 Log *log = DefaultEnv::GetLog();
2642 XrdSysMutexHelper scopedLock( self->pMutex );
2643 self->pInTheFly.erase( message );
2644
2645 log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2646 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2647 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2648
2649 //--------------------------------------------------------------------------
2650 // Report to monitoring
2651 //--------------------------------------------------------------------------
2653 if( mon )
2654 {
2656 i.file = self->pFileUrl;
2657 i.status = status;
2658
2659 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2660 switch( req->header.requestid )
2661 {
2669 }
2670
2671 mon->Event( Monitor::EvErrIO, &i );
2672 }
2673
2674 //--------------------------------------------------------------------------
2675 // The message is not recoverable
2676 // (message using a kernel buffer is not recoverable by definition)
2677 //--------------------------------------------------------------------------
2678 if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2679 {
2680 log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2681 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2682 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2683
2684 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2685 delete status;
2686 return;
2687 }
2688
2689 //--------------------------------------------------------------------------
2690 // Insert the message to the recovery queue and start the recovery
2691 // procedure if we don't have any more message in the fly
2692 //--------------------------------------------------------------------------
2693 self->pCloseReason = *status;
2694 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2695 delete status;
2696 }
2697
2698 //----------------------------------------------------------------------------
2699 // Handle stateful redirect
2700 //----------------------------------------------------------------------------
2701 void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2702 const std::string &redirectUrl,
2703 Message *message,
2704 ResponseHandler *userHandler,
2705 MessageSendParams &sendParams )
2706 {
2707 XrdSysMutexHelper scopedLock( self->pMutex );
2708 self->pInTheFly.erase( message );
2709
2710 //--------------------------------------------------------------------------
2711 // Register the state redirect url and append the new cgi information to
2712 // the file URL
2713 //--------------------------------------------------------------------------
2714 if( !self->pStateRedirect )
2715 {
2716 std::ostringstream o;
2717 self->pStateRedirect = new URL( redirectUrl );
2718 URL::ParamsMap params = self->pFileUrl->GetParams();
2719 MessageUtils::MergeCGI( params,
2720 self->pStateRedirect->GetParams(),
2721 false );
2722 self->pFileUrl->SetParams( params );
2723 }
2724
2725 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2726 }
2727
2728 //----------------------------------------------------------------------------
2729 // Handle stateful response
2730 //----------------------------------------------------------------------------
2731 void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2732 XRootDStatus *status,
2733 Message *message,
2734 AnyObject *response,
2735 HostList */*urlList*/ )
2736 {
2737 Log *log = DefaultEnv::GetLog();
2738 XrdSysMutexHelper scopedLock( self->pMutex );
2739
2740 log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2741 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2742 message->GetObfuscatedDescription().c_str() );
2743
2744 //--------------------------------------------------------------------------
2745 // Since this message may be the last "in-the-fly" and no recovery
2746 // is done if messages are in the fly, we may need to trigger recovery
2747 //--------------------------------------------------------------------------
2748 self->pInTheFly.erase( message );
2749 RunRecovery( self );
2750
2751 //--------------------------------------------------------------------------
2752 // Play with the actual response before returning it. This is a good
2753 // place to do caching in the future.
2754 //--------------------------------------------------------------------------
2755 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2756 switch( req->header.requestid )
2757 {
2758 //------------------------------------------------------------------------
2759 // Cache the stat response
2760 //------------------------------------------------------------------------
2761 case kXR_stat:
2762 {
2763 StatInfo *info = 0;
2764 response->Get( info );
2765 delete self->pStatInfo;
2766 self->pStatInfo = new StatInfo( *info );
2767 break;
2768 }
2769
2770 //------------------------------------------------------------------------
2771 // Handle read response
2772 //------------------------------------------------------------------------
2773 case kXR_read:
2774 {
2775 ++self->pRCount;
2776 self->pRBytes += req->read.rlen;
2777 break;
2778 }
2779
2780 //------------------------------------------------------------------------
2781 // Handle read response
2782 //------------------------------------------------------------------------
2783 case kXR_pgread:
2784 {
2785 ++self->pRCount;
2786 self->pRBytes += req->pgread.rlen;
2787 break;
2788 }
2789
2790 //------------------------------------------------------------------------
2791 // Handle readv response
2792 //------------------------------------------------------------------------
2793 case kXR_readv:
2794 {
2795 ++self->pVRCount;
2796 size_t segs = req->header.dlen/sizeof(readahead_list);
2797 readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2798 for( size_t i = 0; i < segs; ++i )
2799 self->pVRBytes += dataChunk[i].rlen;
2800 self->pVSegs += segs;
2801 break;
2802 }
2803
2804 //------------------------------------------------------------------------
2805 // Handle write response
2806 //------------------------------------------------------------------------
2807 case kXR_write:
2808 {
2809 ++self->pWCount;
2810 self->pWBytes += req->write.dlen;
2811 break;
2812 }
2813
2814 //------------------------------------------------------------------------
2815 // Handle write response
2816 //------------------------------------------------------------------------
2817 case kXR_pgwrite:
2818 {
2819 ++self->pWCount;
2820 self->pWBytes += req->pgwrite.dlen;
2821 break;
2822 }
2823
2824 //------------------------------------------------------------------------
2825 // Handle writev response
2826 //------------------------------------------------------------------------
2827 case kXR_writev:
2828 {
2829 ++self->pVWCount;
2830 size_t size = req->header.dlen/sizeof(readahead_list);
2831 XrdProto::write_list *wrtList =
2832 reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2833 for( size_t i = 0; i < size; ++i )
2834 self->pVWBytes += wrtList[i].wlen;
2835 break;
2836 }
2837 };
2838 }
2839
2840 //------------------------------------------------------------------------
2842 //------------------------------------------------------------------------
2843 void FileStateHandler::Tick( time_t now )
2844 {
2845 if (pMutex.CondLock())
2846 {TimeOutRequests( now );
2847 pMutex.UnLock();
2848 }
2849 }
2850
2851 //----------------------------------------------------------------------------
2852 // Declare timeout on requests being recovered
2853 //----------------------------------------------------------------------------
2855 {
2856 if( !pToBeRecovered.empty() )
2857 {
2858 Log *log = DefaultEnv::GetLog();
2859 log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2860 pFileUrl->GetObfuscatedURL().c_str() );
2861 RequestList::iterator it;
2863 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2864 {
2865 if( it->params.expires <= now )
2866 {
2867 jobMan->QueueJob( new ResponseJob(
2868 it->handler,
2870 0, it->params.hostList ) );
2871 it = pToBeRecovered.erase( it );
2872 }
2873 else
2874 ++it;
2875 }
2876 }
2877 }
2878
2879 //----------------------------------------------------------------------------
2880 // Called in the child process after the fork
2881 //----------------------------------------------------------------------------
2883 {
2884 Log *log = DefaultEnv::GetLog();
2885
2886 if( pFileState == Closed || pFileState == Error )
2887 return;
2888
2889 if( (IsReadOnly() && pDoRecoverRead) ||
2890 (!IsReadOnly() && pDoRecoverWrite) )
2891 {
2892 log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2893 "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2894 pFileState = Recovering;
2895 pInTheFly.clear();
2896 pToBeRecovered.clear();
2897 }
2898 else
2899 pFileState = Error;
2900 }
2901
2902 //------------------------------------------------------------------------
2903 // Try other data server
2904 //------------------------------------------------------------------------
2905 XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, time_t timeout )
2906 {
2907 XrdSysMutexHelper scopedLock( self->pMutex );
2908
2909 if( self->pFileState != Opened || !self->pLoadBalancer )
2911
2912 self->pFileState = Recovering;
2913
2914 Log *log = DefaultEnv::GetLog();
2915 log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2916 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2917
2918 // merge CGI
2919 auto lbcgi = self->pLoadBalancer->GetParams();
2920 auto dtcgi = self->pDataServer->GetParams();
2921 MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2922 // update tried CGI
2923 auto itr = lbcgi.find( "tried" );
2924 if( itr == lbcgi.end() )
2925 lbcgi["tried"] = self->pDataServer->GetHostName();
2926 else
2927 {
2928 std::string tried = itr->second;
2929 tried += "," + self->pDataServer->GetHostName();
2930 lbcgi["tried"] = tried;
2931 }
2932 self->pLoadBalancer->SetParams( lbcgi );
2933
2934 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2935 }
2936
2937 //------------------------------------------------------------------------
2938 // Generic implementation of xattr operation
2939 //------------------------------------------------------------------------
2940 template<typename T>
2941 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2942 kXR_char subcode,
2943 kXR_char options,
2944 const std::vector<T> &attrs,
2945 ResponseHandler *handler,
2946 time_t timeout )
2947 {
2948 //--------------------------------------------------------------------------
2949 // Issue a new fattr request
2950 //--------------------------------------------------------------------------
2951 Message *msg;
2952 ClientFattrRequest *req;
2953 MessageUtils::CreateRequest( msg, req );
2954
2955 req->requestid = kXR_fattr;
2956 req->subcode = subcode;
2957 req->numattr = attrs.size();
2958 req->options = options;
2959 memcpy( req->fhandle, self->pFileHandle, 4 );
2961 if( !st.IsOK() ) return st;
2962
2963 MessageSendParams params;
2964 params.timeout = timeout;
2965 params.followRedirects = false;
2966 params.stateful = true;
2968
2970 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2971
2972 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2973 }
2974
2975 //----------------------------------------------------------------------------
2976 // Send a message to a host or put it in the recovery queue
2977 //----------------------------------------------------------------------------
2978 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2979 const URL &url,
2980 Message *msg,
2981 ResponseHandler *handler,
2982 MessageSendParams &sendParams )
2983 {
2984 //--------------------------------------------------------------------------
2985 // Recovering
2986 //--------------------------------------------------------------------------
2987 if( self->pFileState == Recovering )
2988 {
2989 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2990 }
2991
2992 //--------------------------------------------------------------------------
2993 // Trying to send
2994 //--------------------------------------------------------------------------
2995 if( self->pFileState == Opened )
2996 {
2997 msg->SetSessionId( self->pSessionId );
2998 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2999
3000 //------------------------------------------------------------------------
3001 // Invalid session id means that the connection has been broken while we
3002 // were idle so we haven't been informed about this fact earlier.
3003 //------------------------------------------------------------------------
3004 if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
3005 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
3006
3007 if( st.IsOK() )
3008 self->pInTheFly.insert(msg);
3009 else
3010 delete handler;
3011 return st;
3012 }
3013 return Status( stError, errInvalidOp );
3014 }
3015
3016 //----------------------------------------------------------------------------
3017 // Check if the stateful error is recoverable
3018 //----------------------------------------------------------------------------
3019 bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
3020 {
3021 const auto recoverable_errors = {
3028 };
3029
3030 if (pDoRecoverRead || pDoRecoverWrite)
3031 for (const auto error : recoverable_errors)
3032 if (status.code == error)
3033 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3034
3035 return false;
3036 }
3037
3038 //----------------------------------------------------------------------------
3039 // Check if the file is open for read only
3040 //----------------------------------------------------------------------------
3041 bool FileStateHandler::IsReadOnly() const
3042 {
3043 // Keeping the check for append (with a cast) as this was previously tested,
3044 // but OpenFlags::Flags does not currently enumerate the Append flag
3045 if( (pOpenFlags & OpenFlags::Read) && !(pOpenFlags & OpenFlags::Update) &&
3046 !(pOpenFlags & static_cast<OpenFlags::Flags>(kXR_open_apnd)) )
3047 return true;
3048 return false;
3049 }
3050
3051 //----------------------------------------------------------------------------
3052 // Recover a message
3053 //----------------------------------------------------------------------------
3054 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3055 RequestData rd,
3056 bool callbackOnFailure )
3057 {
3058 self->pFileState = Recovering;
3059
3060 Log *log = DefaultEnv::GetLog();
3061 log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
3062 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3063 rd.request->GetObfuscatedDescription().c_str() );
3064
3065 Status st = RunRecovery( self );
3066 if( st.IsOK() )
3067 {
3068 self->pToBeRecovered.push_back( rd );
3069 return st;
3070 }
3071
3072 if( callbackOnFailure )
3073 self->FailMessage( rd, st );
3074
3075 return st;
3076 }
3077
3078 //----------------------------------------------------------------------------
3079 // Run the recovery procedure if appropriate
3080 //----------------------------------------------------------------------------
3081 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3082 {
3083 if( self->pFileState != Recovering )
3084 return Status();
3085
3086 if( !self->pInTheFly.empty() )
3087 return Status();
3088
3089 Log *log = DefaultEnv::GetLog();
3090 log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
3091 self->pFileUrl->GetObfuscatedURL().c_str() );
3092
3093 Status st;
3094 if( self->pStateRedirect )
3095 {
3096 SendClose( self, 0 );
3097 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3098 delete self->pStateRedirect; self->pStateRedirect = 0;
3099 }
3100 else if( self->IsReadOnly() && self->pLoadBalancer )
3101 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3102 else
3103 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3104
3105 if( !st.IsOK() )
3106 {
3107 self->pFileState = Error;
3108 self->pStatus = st;
3109 self->FailQueuedMessages( st );
3110 }
3111
3112 return st;
3113 }
3114
3115 //----------------------------------------------------------------------------
3116 // Send a close and ignore the response
3117 //----------------------------------------------------------------------------
3118 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3119 time_t timeout )
3120 {
3121 Message *msg;
3122 ClientCloseRequest *req;
3123 MessageUtils::CreateRequest( msg, req );
3124
3125 req->requestid = kXR_close;
3126 memcpy( req->fhandle, self->pFileHandle, 4 );
3127
3129 msg->SetSessionId( self->pSessionId );
3130 ResponseHandler *handler = ResponseHandler::Wrap(
3131 [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3132 MessageSendParams params;
3133 params.timeout = timeout;
3134 params.followRedirects = false;
3135 params.stateful = true;
3136
3138
3139 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3140 }
3141
3142 //----------------------------------------------------------------------------
3143 // Re-open the current file at a given server
3144 //----------------------------------------------------------------------------
3145 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3146 const URL &url,
3147 time_t timeout )
3148 {
3149 Log *log = DefaultEnv::GetLog();
3150 log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3151 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3152
3153 //--------------------------------------------------------------------------
3154 // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3155 // procedure to delete a file that has been partially updated or fail it
3156 // because a partially uploaded file already exists.
3157 //--------------------------------------------------------------------------
3158 if( self->pOpenFlags & OpenFlags::Delete)
3159 {
3160 self->pOpenFlags &= ~OpenFlags::Delete;
3161 self->pOpenFlags |= OpenFlags::Update;
3162 }
3163
3164 self->pOpenFlags &= ~OpenFlags::New;
3165
3166 Message *msg;
3167 ClientOpenRequest *req;
3168 URL u = url;
3169
3170 if( url.GetPath().empty() )
3171 u.SetPath( self->pFileUrl->GetPath() );
3172
3173 std::string path = u.GetPathWithFilteredParams();
3174 MessageUtils::CreateRequest( msg, req, path.length() );
3175
3176 req->requestid = kXR_open;
3177 req->mode = self->pOpenMode;
3178 req->options = (self->pOpenFlags & 0xffff);
3179 req->dlen = path.length();
3180 URL sendUrl;
3181 XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3182 if( !st.IsOK() )
3183 {
3184 self->pStatus = st;
3185 self->pFileState = Closed;
3186 return st;
3187 }
3188 msg->Append( path.c_str(), path.length(), 24 );
3189
3190 // create a new reopen handler
3191 // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3192 // until we know that 'SendMessage' was successful)
3193 OpenHandler *openHandler = new OpenHandler( self, 0 );
3194 MessageSendParams params; params.timeout = timeout;
3197
3198 //--------------------------------------------------------------------------
3199 // Issue the open request
3200 //--------------------------------------------------------------------------
3201 st = self->IssueRequest( sendUrl, msg, openHandler, params );
3202
3203 // if there was a problem destroy the open handler
3204 if( !st.IsOK() )
3205 {
3206 delete openHandler;
3207 self->pStatus = st;
3208 self->pFileState = Closed;
3209 }
3210 return st;
3211 }
3212
3213 //------------------------------------------------------------------------
3214 // Fail a message
3215 //------------------------------------------------------------------------
3216 void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3217 {
3218 Log *log = DefaultEnv::GetLog();
3219 log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3220 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3221 rd.request->GetObfuscatedDescription().c_str(),
3222 status.ToStr().c_str() );
3223
3224 StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3225 if( !sh )
3226 {
3227 Log *log = DefaultEnv::GetLog();
3228 log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3229 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3230 rd.request->GetObfuscatedDescription().c_str() );
3231 return;
3232 }
3233
3234 JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
3235 ResponseHandler *userHandler = sh->GetUserHandler();
3236 jobMan->QueueJob( new ResponseJob(
3237 userHandler,
3238 new XRootDStatus( status ),
3239 0, rd.params.hostList ) );
3240
3241 delete sh;
3242 }
3243
3244 //----------------------------------------------------------------------------
3245 // Fail queued messages
3246 //----------------------------------------------------------------------------
3247 void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3248 {
3249 RequestList::iterator it;
3250 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3251 FailMessage( *it, status );
3252 pToBeRecovered.clear();
3253 }
3254
3255 //------------------------------------------------------------------------
3256 // Re-send queued messages
3257 //------------------------------------------------------------------------
3258 void FileStateHandler::ReSendQueuedMessages()
3259 {
3260 RequestList::iterator it;
3261 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3262 {
3263 it->request->SetSessionId( pSessionId );
3264 ReWriteFileHandle( it->request );
3265 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3266 it->handler, it->params );
3267 if( !st.IsOK() )
3268 FailMessage( *it, st );
3269 }
3270 pToBeRecovered.clear();
3271 }
3272
3273 //------------------------------------------------------------------------
3274 // Re-write file handle
3275 //------------------------------------------------------------------------
3276 void FileStateHandler::ReWriteFileHandle( Message *msg )
3277 {
3278 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
3279 switch( hdr->requestid )
3280 {
3281 case kXR_read:
3282 {
3283 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
3284 memcpy( req->fhandle, pFileHandle, 4 );
3285 break;
3286 }
3287 case kXR_write:
3288 {
3289 ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
3290 memcpy( req->fhandle, pFileHandle, 4 );
3291 break;
3292 }
3293 case kXR_sync:
3294 {
3295 ClientSyncRequest *req = (ClientSyncRequest*)msg->GetBuffer();
3296 memcpy( req->fhandle, pFileHandle, 4 );
3297 break;
3298 }
3299 case kXR_truncate:
3300 {
3301 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->GetBuffer();
3302 memcpy( req->fhandle, pFileHandle, 4 );
3303 break;
3304 }
3305 case kXR_readv:
3306 {
3307 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
3308 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3309 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3310 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3311 break;
3312 }
3313 case kXR_writev:
3314 {
3315 ClientWriteVRequest *req =
3316 reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3317 XrdProto::write_list *wrtList =
3318 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3319 size_t size = req->dlen / sizeof(XrdProto::write_list);
3320 for( size_t i = 0; i < size; ++i )
3321 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3322 break;
3323 }
3324 case kXR_pgread:
3325 {
3326 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->GetBuffer();
3327 memcpy( req->fhandle, pFileHandle, 4 );
3328 break;
3329 }
3330 case kXR_pgwrite:
3331 {
3332 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->GetBuffer();
3333 memcpy( req->fhandle, pFileHandle, 4 );
3334 break;
3335 }
3336 }
3337
3338 Log *log = DefaultEnv::GetLog();
3339 log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3340 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3341 *((uint32_t*)pFileHandle) );
3343 }
3344
3345 //----------------------------------------------------------------------------
3346 // Dispatch monitoring information on close
3347 //----------------------------------------------------------------------------
3348 void FileStateHandler::MonitorClose( const XRootDStatus *status )
3349 {
3350 Monitor *mon = DefaultEnv::GetMonitor();
3351 if( mon )
3352 {
3353 Monitor::CloseInfo i;
3354 i.file = pFileUrl;
3355 i.oTOD = pOpenTime;
3356 gettimeofday( &i.cTOD, 0 );
3357 i.rBytes = pRBytes;
3358 i.vrBytes = pVRBytes;
3359 i.wBytes = pWBytes;
3360 i.vwBytes = pVWBytes;
3361 i.vSegs = pVSegs;
3362 i.rCount = pRCount;
3363 i.vCount = pVRCount;
3364 i.wCount = pWCount;
3365 i.status = status;
3366 mon->Event( Monitor::EvClose, &i );
3367 }
3368 }
3369
3370 XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3371 Message *msg,
3372 ResponseHandler *handler,
3373 MessageSendParams &sendParams )
3374 {
3375 // first handle Metalinks
3376 if( pUseVirtRedirector && url.IsMetalink() )
3377 return MessageUtils::RedirectMessage( url, msg, handler,
3378 sendParams, pLFileHandler );
3379
3380 // than local file access
3381 if( url.IsLocalFile() )
3382 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3383
3384 // and finally ordinary XRootD requests
3385 return MessageUtils::SendMessage( url, msg, handler,
3386 sendParams, pLFileHandler );
3387 }
3388
3389 //------------------------------------------------------------------------
3390 // Send a write request with payload being stored in a kernel buffer
3391 //------------------------------------------------------------------------
3392 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3393 uint64_t offset,
3394 uint32_t length,
3395 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3396 ResponseHandler *handler,
3397 time_t timeout )
3398 {
3399 //--------------------------------------------------------------------------
3400 // Create the write request
3401 //--------------------------------------------------------------------------
3402 XrdSysMutexHelper scopedLock( self->pMutex );
3403
3404 if( self->pFileState != Opened && self->pFileState != Recovering )
3405 return XRootDStatus( stError, errInvalidOp );
3406
3407 Log *log = DefaultEnv::GetLog();
3408 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3409 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3410 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3411
3412 Message *msg;
3413 ClientWriteRequest *req;
3414 MessageUtils::CreateRequest( msg, req );
3415
3416 req->requestid = kXR_write;
3417 req->offset = offset;
3418 req->dlen = length;
3419 memcpy( req->fhandle, self->pFileHandle, 4 );
3420
3421 MessageSendParams params;
3422 params.timeout = timeout;
3423 params.followRedirects = false;
3424 params.stateful = true;
3425 params.kbuff = kbuff.release();
3426 params.chunkList = new ChunkList();
3427
3429
3431 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3432
3433 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3434 }
3435
3436 //------------------------------------------------------------------------
3437 // Fills in the file template value and optiont fields that need the
3438 // template (i.e. samefs and dup) in an Open message request
3439 //------------------------------------------------------------------------
3440 XRootDStatus FileStateHandler::FillFhTempl(
3441 std::shared_ptr<FileStateHandler> &self,
3442 const URL &url, Message *msg, URL &sendUrl)
3443 {
3444 ClientOpenRequest *req = (ClientOpenRequest*)msg->GetBuffer();
3445 sendUrl = url;
3446
3447 if( !self->NeedFileTempl() )
3448 {
3449 // template file not requireed
3450 return XRootDStatus();
3451 }
3452
3453 using wp = std::weak_ptr<FileStateHandler>;
3454 if( !self->pTemplateFileWp.owner_before(wp{}) &&
3455 !wp{}.owner_before(self->pTemplateFileWp) )
3456 {
3457 // no tempalte file was set
3458 return XRootDStatus( stError, errInvalidArgs, 0,
3459 "File flags required a template file" );
3460 }
3461
3462 // all the options that need template
3463 if( self->pOpenFlags & OpenFlags::Dup )
3464 req->optiont |= kXR_dup;
3465 if( self->pOpenFlags & OpenFlags::Samefs )
3466 req->optiont |= kXR_samefs;
3467
3468 std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3469 if(!tfp)
3470 return XRootDStatus( stError, errInvalidArgs, 0,
3471 "Template file object does not exist" );
3472
3473 XrdSysMutexHelper scopedLock( tfp->pMutex );
3474
3475 if( tfp->pFileState != Opened )
3476 return XRootDStatus( stError, errInvalidOp, 0,
3477 "Template file not open" );
3478
3479 if (!tfp->pDataServer || !tfp->pFileHandle)
3480 return XRootDStatus( stError, errInvalidArgs, 0,
3481 "Template file not connected" );
3482
3483 sendUrl.SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3484 sendUrl.SetUserName( tfp->pDataServer->GetUserName() );
3485 msg->SetSessionId( tfp->pSessionId );
3486 memcpy( req->fhtemplt, tfp->pFileHandle, sizeof(req->fhtemplt) );
3487
3488 if( !Utils::HasKSameFS( sendUrl ) )
3489 return XRootDStatus( stError, errNotSupported );
3490
3491 return XRootDStatus();
3492 }
3493
3494 //------------------------------------------------------------------------
3495 // Clone file ranges into current file
3496 //------------------------------------------------------------------------
3497 XRootDStatus FileStateHandler::Clone(std::shared_ptr<FileStateHandler> &self,
3498 const CloneLocations &locs,
3499 ResponseHandler *handler,
3500 time_t timeout )
3501 {
3502 XrdSysMutexHelper scopedLock( self->pMutex );
3503
3504 if( self->pFileState == Error ) return self->pStatus;
3505
3506 if( self->pFileState != Opened && self->pFileState != Recovering )
3508
3509 if( !Utils::HasKSameFS( *self->pDataServer ) )
3511
3512 Log *log = DefaultEnv::GetLog();
3513 log->Debug( FileMsg, "[%p@%s] Sending a clone command for handle %#x to %s",
3514 self.get(), self->pFileUrl->GetURL().c_str(),
3515 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3516
3517 Message *msg;
3518 ClientReadRequest *req;
3519
3520 size_t nrange = locs.locations.size();
3521
3522 MessageUtils::CreateRequest( msg, req, sizeof(XrdProto::clone_list)*nrange );
3523
3524 req->requestid = kXR_clone;
3525 req->dlen = sizeof(XrdProto::clone_list)*nrange;
3526 memcpy( req->fhandle, self->pFileHandle, 4 );
3527
3529 int idx=0;
3530 for(auto &loc: locs.locations)
3531 {
3532 if( !loc.file )
3533 return XRootDStatus( stError, errInvalidOp, 0,
3534 "Template file not available" );
3535
3536 FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>(loc.file.get());
3537 if( !fht )
3538 return XRootDStatus( stError, errInvalidOp, 0,
3539 "Template file invalid" );
3540
3541 std::shared_ptr<FileStateHandler> tfp = fht->pTemplateFileWp.lock();
3542 if( !tfp )
3543 return XRootDStatus( stError, errInvalidOp, 0,
3544 "Template file object does not exist" );
3545
3546 XrdSysMutexHelper scopedLock( tfp->pMutex );
3547 if( tfp->pFileState != Opened )
3548 return XRootDStatus( stError, errInvalidOp, 0,
3549 "Template file not open" );
3550
3551 if( tfp->pSessionId != self->pSessionId )
3552 return XRootDStatus( stError, errInvalidOp, 0,
3553 "Clone source not at same location as destination" );
3554
3555 memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3556 cl[idx].srcOffs = loc.srcOffs;
3557 cl[idx].srcLen = loc.srcLen;
3558 cl[idx].dstOffs = loc.dstOffs;
3559 ++idx;
3560 }
3561
3563 MessageSendParams params;
3564 params.timeout = timeout;
3565 params.followRedirects = false;
3566 params.stateful = true;
3568 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3569
3570 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3571 }
3572}
kXR_unt16 requestid
Definition XProtocol.hh:511
kXR_unt16 requestid
Definition XProtocol.hh:666
kXR_unt16 requestid
Definition XProtocol.hh:847
@ kXR_fattrDel
Definition XProtocol.hh:300
@ kXR_fattrSet
Definition XProtocol.hh:303
@ kXR_fattrList
Definition XProtocol.hh:302
@ kXR_fattrGet
Definition XProtocol.hh:301
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:565
kXR_char fhandle[4]
Definition XProtocol.hh:823
struct ClientPgReadRequest pgread
Definition XProtocol.hh:903
kXR_char fhandle[4]
Definition XProtocol.hh:848
kXR_char fhandle[4]
Definition XProtocol.hh:812
kXR_unt16 requestid
Definition XProtocol.hh:680
@ kXR_virtReadv
Definition XProtocol.hh:152
kXR_char fhtemplt[4]
Definition XProtocol.hh:516
kXR_unt16 options
Definition XProtocol.hh:513
static const int kXR_ckpXeq
Definition XProtocol.hh:218
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:904
kXR_unt16 requestid
Definition XProtocol.hh:257
@ kXR_async
Definition XProtocol.hh:488
@ kXR_open_apnd
Definition XProtocol.hh:492
@ kXR_retstat
Definition XProtocol.hh:493
struct ClientRequestHdr header
Definition XProtocol.hh:887
kXR_char fhandle[4]
Definition XProtocol.hh:543
#define kXR_recoverWrts
kXR_unt16 optiont
Definition XProtocol.hh:514
kXR_char fhandle[4]
Definition XProtocol.hh:681
kXR_char fhandle[4]
Definition XProtocol.hh:258
kXR_unt16 requestid
Definition XProtocol.hh:159
kXR_char fhandle[4]
Definition XProtocol.hh:669
@ kXR_read
Definition XProtocol.hh:126
@ kXR_open
Definition XProtocol.hh:123
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_clone
Definition XProtocol.hh:145
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_sync
Definition XProtocol.hh:129
@ kXR_fattr
Definition XProtocol.hh:133
@ kXR_query
Definition XProtocol.hh:114
@ kXR_write
Definition XProtocol.hh:132
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_stat
Definition XProtocol.hh:130
@ kXR_pgread
Definition XProtocol.hh:143
@ kXR_chkpoint
Definition XProtocol.hh:125
@ kXR_close
Definition XProtocol.hh:116
@ kXR_pgwrite
Definition XProtocol.hh:139
struct ClientReadRequest read
Definition XProtocol.hh:909
kXR_int32 rlen
Definition XProtocol.hh:696
kXR_unt16 requestid
Definition XProtocol.hh:808
kXR_unt16 requestid
Definition XProtocol.hh:822
kXR_int64 offset
Definition XProtocol.hh:697
#define kXR_PROTPGRWVERSION
Definition XProtocol.hh:73
@ kXR_dup
Definition XProtocol.hh:503
@ kXR_samefs
Definition XProtocol.hh:504
struct ClientWriteRequest write
Definition XProtocol.hh:918
kXR_unt16 requestid
Definition XProtocol.hh:706
@ kXR_Qvisa
Definition XProtocol.hh:656
unsigned char kXR_char
Definition XPtypes.hh:65
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
bool IsOpen() const
Check if the file is open.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes).
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
void SetHostPort(const std::string &hostName, int port)
Definition XrdClURL.hh:206
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:331
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
bool IsLocalFile() const
Definition XrdClURL.cc:474
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint64_t FileMsg
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
T & To(AnyObject &any)
const uint16_t errRedirect
const uint16_t errSocketDisconnected
none object for initializing empty Optional
XrdSysError Log
Definition XrdConfig.cc:113
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition XProtocol.hh:328
kXR_char fhandle[4]
Definition XProtocol.hh:318
kXR_unt16 requestid
Definition XProtocol.hh:317
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted