百度文件系統(tǒng)bfs源碼分析系列(一)

這一篇會分析寫文件的整個工作流程,因為在分析源碼時,單獨分析下src/metaserver/下的幾個文件有些疑問,在metaserver_main.cc啟動時,會注冊rpc服務(wù)并處理相應(yīng)的消息:

 45     // rpc_server
 46     sofa::pbrpc::RpcServerOptions options;
 47 
 48     sofa::pbrpc::RpcServer rpc_server(options);
 49 
 50     // Server
 51     baidu::bfs::metaserver::MetaServer* metaserver_service = new baidu::bfs::metaserver::MetaServerImpl();
 52 
 53     // Register
 54     if (!rpc_server.RegisterService(metaserver_service)) {
 55         return EXIT_FAILURE;
 56     }
 57 
 58     // Start
 59     std::vector<std::string> metaserver_nodes;
 60     baidu::common::SplitString(FLAGS_metaserver_nodes, ",", &metaserver_nodes);
 61     std::string server_addr = metaserver_nodes[FLAGS_node_index];
 62     std::string listen_addr = std::string("0.0.0.0") + server_addr.substr(server_addr.rfind(':'));
 63     if (!rpc_server.Start(listen_addr)) {
 64         return EXIT_FAILURE;
 65     }
 68     rpc_server.Run();

然后在metaserver_impl.h文件中有些函數(shù)定義中使用的一些成員變量并沒有在相應(yīng)的類聲明中定義,通過metaserver.proto轉(zhuǎn)換后的pb.cc文件中:

 54 service MetaServer {
 55     rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
 56     rpc SyncBlock(SyncBlockRequest) returns(SyncBlockResponse);
 57     rpc FinishBlock(FinishBlockRequest) returns(FinishBlockResponse);
 58     rpc RemoveBlock(RemoveBlockRequest) returns(RemoveBlockResponse);
 59 }
1175 class MetaServer : public ::google::protobuf::Service {
1176  protected:
1177   // This class should be treated as an abstract interface.
1178   inline MetaServer() {};
1179  public:
1180   virtual ~MetaServer();

1186   virtual void AddBlock(::google::protobuf::RpcController* controller,
1187                        const ::baidu::bfs::metaserver::AddBlockRequest* request,
1188                        ::baidu::bfs::metaserver::AddBlockResponse* response,
1189                        ::google::protobuf::Closure* done);
1190   virtual void SyncBlock(::google::protobuf::RpcController* controller,
1191                        const ::baidu::bfs::metaserver::SyncBlockRequest* request,
1192                        ::baidu::bfs::metaserver::SyncBlockResponse* response,
1193                        ::google::protobuf::Closure* done);
1194   //more code...
 42 class MetaServerImpl : public MetaServer {
 43 public:
 44     MetaServerImpl();
 45     virtual ~MetaServerImpl();
 79 private:
 80     //more code...
 90 private:
 91     ThreadPool* read_thread_pool_;
 92     ThreadPool* work_thread_pool_;
 93     ThreadPool* report_thread_pool_;
 94     ThreadPool* heartbeat_thread_pool_;
 95     ChunkServerManager* chunkserver_manager_;
 96     BlockMappingManager* block_mapping_manager_;
 97 
 98     volatile bool readonly_;
 99     volatile int recover_timeout_;
100     RecoverMode recover_mode_;
101     int64_t start_time_;
102 };

但在類似實現(xiàn)virtual rpc method的時候,有些問題:

 88 void MetaServerImpl::HeartBeat(::google::protobuf::RpcController* controller,
 89                          const HeartBeatRequest* request,
 90                          HeartBeatResponse* response,
 91                          ::google::protobuf::Closure* done) {
 92     if (!is_leader_) { //這里有is_leader???
 93         response->set_status(kIsFollower);
 94         done->Run();
 95         return;
 96     }
 97     g_heart_beat.Inc();
 99     int64_t version = request->namespace_version();
100     if (version == namespace_->Version()) {
101         chunkserver_manager_->HandleHeartBeat(request, response);
102     } else {
103         response->set_status(kVersionError);
104     }
105     response->set_namespace_version(namespace_->Version());
106     done->Run();
107 }

類似的這幾個函數(shù)同nameserver impl類中相同,但在后者的類聲明中有定義相應(yīng)的數(shù)據(jù)成員,而后者有is_leader_之類的是因為raft,而metaserver沒有這個功能,不知道這里是不是拷貝那邊的代碼?沒有用到的功能我這邊不會分析,比如上面說的。

下面正式分析寫流程,因為macbook上沒什么辦公用品,一些圖就不畫了。

寫流程(上)

在bfs_client.cc中,put命令是把本地文件寫到bfs中,如usage中那般put <localfile> <bfsfile> : copy file from local to bfs;部分代碼如下:

211 int BfsPut(baidu::bfs::FS* fs, int argc, char* argv[]) {
212     //check argv...
247     baidu::bfs::File* file;
248     if (fs->OpenFile(target.c_str(), O_WRONLY | O_TRUNC, st.st_mode, &file, baidu::bfs::WriteOptions()) != 0) {
249         //do error logic...
252     }

253     char buf[10240];
254     int64_t len = 0;
255     int32_t bytes = 0;
256     while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257         int32_t write_bytes = file->Write(buf, bytes);
258         if (write_bytes < bytes) {
260             ret = 2;
261             break;
262         }
263         len += bytes;
264     }
265     fclose(fp);
266     if (file->Close() != 0) {
269     }
270     //other code...
273 }

以上功能對輸入?yún)?shù)作些檢查,然后OpenFile,以同樣的st_mode值,如果文件存在則截斷為零,然后返回File對象,File實現(xiàn)邏輯復(fù)雜,后續(xù)分析;之后開始file->Write;最后file->Close。從這里分析出,貌似不支持?jǐn)帱c續(xù)傳???這個過程還是比較簡單的。

這里的寫有兩種方式,分別是kWriteChainskWriteFanout,具體點“鏈?zhǔn)綄懹锌赡艽嬖诼?jié)點,一旦鏈中某個結(jié)點速度較慢,則會拖慢整個鏈的寫入速度,現(xiàn)在也支持扇出寫,同時寫四個副本,只要有三個成功就返回成功,可以去規(guī)避慢節(jié)點?!?bfs_qa),具體怎么實現(xiàn)后續(xù)在代碼中分析。

322 int32_t FSImpl::OpenFile(const char* path, int32_t flags, int32_t mode,
323                          File** file, const WriteOptions& options) {
324     *file = NULL;
328     WriteOptions write_option = options;
329     //set options.write_mode
341     CreateFileRequest request;
342     CreateFileResponse response;
343     //set request params
348     bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::CreateFile,
349         &request, &response, 15, 1);
350     if (!rpc_ret || response.status() != kOK) {
353         if (!rpc_ret) {
354             return TIMEOUT;
355         } else {
356             return GetErrorCode(response.status());
357         }
358     } else {
359         *file = new FileImplWrapper(this, rpc_client_, path, flags, write_option);
360     }
361     return OK;
362 }

///proto
 40 message CreateFileRequest {
 41     optional int64 sequence_id = 1;////TODO 0
 42     optional string file_name = 2;
 43     optional int32 mode = 3;
 44     optional int32 flags = 4;
 45     optional int32 replica_num = 5;
 46     optional string user = 7;
 47 }
 48 
 49 message CreateFileResponse {
 50     optional int64 sequence_id = 1;
 51     optional StatusCode status = 2;
 52 }

以上功能是設(shè)置寫的模式,然后設(shè)置請求參數(shù),調(diào)用SendRequest請求哪個service的哪個rpc,帶上request和response,超時時間和重試次數(shù);返回成功后會返回個File對象,關(guān)于protobuf rpc這塊后期有時間分析下原理。

當(dāng)CreateFileRequest請求到達(dá)nameserver服務(wù)上時:

 369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
 370                                 const CreateFileRequest* request,
 371                                 CreateFileResponse* response,
 372                                 ::google::protobuf::Closure* done) {
 373     if (!is_leader_) {
 374         response->set_status(kIsFollower);
 375         done->Run();
 376         return;
 377     }
 378     g_create_file.Inc();
 379     response->set_sequence_id(request->sequence_id());
 380     std::string path = NameSpace::NormalizePath(request->file_name());
 381     int flags = request->flags();
 382     int mode = request->mode();
 383     if (mode == 0) {    
 384         mode = 0644;    // default mode
 385     }
 386     int replica_num = request->replica_num();///副本數(shù)量
 387     NameServerLog log;
 388     std::vector<int64_t> blocks_to_remove;
 389     FileLockGuard file_lock(new WriteLock(path));//TODO 1
 390     StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov     e, &log);
 391     //more code...
 406 }

以上如果不是leader的話則返回給client并重定向到leader服務(wù)上;然后設(shè)置參數(shù),加鎖(實現(xiàn)后面分析)進行串行化;然后調(diào)用namespace_->CreateFile,后者是一些元數(shù)據(jù),用leveldb存儲:

292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_    num,
293                                  std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294     if (file_name == "/") {
295         return kBadParameter;
296     }
297     FileInfo file_info;
298     std::string fname, info_value;
299     StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300     //more code...
338 }

以上會根據(jù)file_name進行split,由BuildPath處理;BuildPath主要做的事情是:對file_name處理每一級目錄,對每一層目錄名所在的深度進行大端編碼EncodingStoreKey(parent_id, name, &key_str),并在leveldb中查找是否存在,除了最后一個文件名:

254 StatusCode NameSpace::BuildPath(const std::string& path, FileInfo* file_info, std::string* fname    ,
255                                 NameServerLog* log) {
256     std::vector<std::string> paths;
257     if (!common::util::SplitPath(path, &paths)) {
259         return kBadParameter;
260     }
262     /// Find parent directory, create if not exist.
263     int64_t parent_id = kRootEntryid;
264     int depth = paths.size();
265     /// if parent is root,  set "file_info"
266     file_info->set_entry_id(kRootEntryid);
267     std::string info_value;
268     for (int i = 0; i < depth - 1; ++i) {
269         if (!LookUp(parent_id, paths[i], file_info)) {
270             file_info->set_type((1 << 9) | 01755);//drwxr-xr-x
271             file_info->set_ctime(time(NULL));
272             file_info->set_entry_id(common::atomic_add64(&last_entry_id_, 1) + 1);
273             file_info->SerializeToString(&info_value);
274             std::string key_str;
275             EncodingStoreKey(parent_id, paths[i], &key_str);
276             leveldb::Status s = db_->Put(leveldb::WriteOptions(), key_str, info_value);//元數(shù)據(jù)序列化到leveldb
277             assert(s.ok());
278             EncodeLog(log, kSyncWrite, key_str, info_value); //記錄日志   
280         } else {
281             if (GetFileType(file_info->type()) != kDir) {//判斷是否為目錄結(jié)構(gòu)
283                 return kBadParameter;
284             }   
285         }       
286         parent_id = file_info->entry_id();
287     }   
288     *fname = paths[depth - 1]; //文件名
289     return kOK;
290 }

194 bool NameSpace::LookUp(int64_t parent_id, const std::string& name, FileInfo* info) {
195     std::string key_str;
196     EncodingStoreKey(parent_id, name, &key_str);
197     if (!GetFromStore(key_str, info)) {
199         return false;
200     }
202     return true;
203 }

135 bool NameSpace::GetFromStore(const std::string& key, FileInfo* info) {
136     std::string value;
137     leveldb::Status s = db_->Get(leveldb::ReadOptions(), key, &value);
138     if (!s.ok()) {
141         return false;
142     }
143     if (!info->ParseFromString(value)) {
145         return false;
146     }
147     return true;
148 }

其中last_entry_id_是在NameSpace::RebuildBlockMap過程中,從leveldb歷史文件元數(shù)據(jù)中的最大值;然后對每層string一個個處理,如果不存在則編碼并格式化,否則作些校驗,并更新parent_id;上面的過程其實是對path進行編碼;比如一開始leveldb為空的,然后path為/home/dirx/filex,則一共執(zhí)行2次,分別是home為parent_id =1,entry_id=2,dirx為parent_id =2,entry_id=3,filex為文件名(目錄)不處理[因為在client端時,會對source進行檢查,必須是xxx或xxx/xxx而不能是xxx/xxx/,而target可以為諸如xxx/xxxx/或xxx/xxxx];至于為什么要這么編碼,后續(xù)會分析。以上以1home為key,其他數(shù)據(jù)為value。

NameSpace::CreateFile中,經(jīng)過NameSpace::BuildPath返回的file_info為文件名的上一級目錄;然后LookUp(parent_id, fname, &file_info)判斷文件(目錄)是否存在;

292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_    num,
293                                  std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294     //more code...
299     StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300     if (status != kOK) {
301         return status;
302     }
303     int64_t parent_id = file_info.entry_id();
304     bool exist = LookUp(parent_id, fname, &file_info);
305 
306     if (exist) {
307         if ((flags & O_TRUNC) == 0) {//對于存在的文件(目錄)但不截斷則返回錯誤
309             return kFileExists;
310         } else {
311             if (GetFileType(file_info.type()) == kDir) {
313                 return kFileExists;//文件名是目錄則返回
314             }
315             for (int i = 0; i < file_info.blocks_size(); i++) {
316                 blocks_to_remove->push_back(file_info.blocks(i));//即將截斷
317             }
318         }
319     }
320     //設(shè)置file_info數(shù)據(jù)并寫leveldb

其中blocks_to_remove處理的是要截斷的文件,需要刪除block元數(shù)據(jù);其中NameServerLog* log為key和value;
以上只是增加塊數(shù)據(jù)和刪除塊數(shù)據(jù),在leader服務(wù)上,接著:

 391     for (size_t i = 0; i < blocks_to_remove.size(); i++) {
 392         block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
 393     }

刪除block的信息,至此,并未真正分配block,會在client寫數(shù)據(jù)時才分配,會在下篇分析。

LogRemote做的事情有些復(fù)雜,會在后面分析。這里簡單說一下,這里會判斷是否是ha功能,如果不是則作為異步task等待工作線程執(zhí)行;否則序列化NameServerLog并記錄到raft中,后續(xù)會回調(diào)SyncLogCallback

 408 bool NameServerImpl::LogRemote(const NameServerLog& log,
 409                                std::function<void (bool)> callback) {
 410     if (sync_ == NULL) {
 411         if (callback) {
 412             sync_callback_thread_pool_->AddTask(std::bind(callback, true));
 413         }
 414         return true;
 415     }
 416     std::string logstr;
 417     if (!log.SerializeToString(&logstr)) {
 419     }
 420     if (callback) {
 421         sync_->Log(logstr, callback);
 422         return true;
 423     } else {
 424         return sync_->Log(logstr, FLAGS_log_replicate_timeout * 1000);
 425     }
 426 }

 42 void RaftImpl::Log(const std::string& entry, std::function<void (bool)> callback) {
 43     raft_node_->AppendLog(entry, callback);
 44 }

450 void RaftNodeImpl::AppendLog(const std::string& log, std::function<void (bool)> callback) {
451     MutexLock lock(&mu_);
452     int64_t index = log_index_ + 1;
453     ///TODO: optimize lock
454     if (!StoreLog(current_term_, index, log)) {
455         thread_pool_->AddTask(std::bind(callback,false));
456         return;
457     }
458     callback_map_.insert(std::make_pair(index, callback));
459     log_index_ ++;
460     for (uint32_t i = 0; i < nodes_.size(); i++) {
461         if (follower_context_[i]) {
462             follower_context_[i]->condition.Signal();
463         }
464     }
465 }

在某個時候:

367                         while (last_applied_ < commit_index) {
368                             last_applied_ ++;
370                             std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
371                                 callback_map_.find(last_applied_);
372                             if (cb_it != callback_map_.end()) {
373                                 std::function<void (bool)> callback = cb_it->second;
374                                 callback_map_.erase(cb_it);
375                                 mu_.Unlock();
377                                 callback(true);//執(zhí)行回調(diào)
378                                 mu_.Lock();
379                             } else {
381                             }

TODO
某些具體的技術(shù)實現(xiàn)在上面分析時跳過了,比如為什么數(shù)據(jù)結(jié)構(gòu)這么設(shè)計,鎖的實現(xiàn),還有raft的實現(xiàn)及l(fā)og等后續(xù)補上,下一篇的分析會補上BlockMappingManager/FileLockManager這兩個;還有一些異常處理也沒仔細(xì)考慮。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容