這一篇會分析寫文件的整個工作流程,因為在分析源碼時,單獨分析下src/metaserver/下的幾個文件有些疑問,在metaserver_main.cc啟動時,會注冊rpc服務(wù)并處理相應(yīng)的消息:
45 // rpc_server
46 sofa::pbrpc::RpcServerOptions options;
47
48 sofa::pbrpc::RpcServer rpc_server(options);
49
50 // Server
51 baidu::bfs::metaserver::MetaServer* metaserver_service = new baidu::bfs::metaserver::MetaServerImpl();
52
53 // Register
54 if (!rpc_server.RegisterService(metaserver_service)) {
55 return EXIT_FAILURE;
56 }
57
58 // Start
59 std::vector<std::string> metaserver_nodes;
60 baidu::common::SplitString(FLAGS_metaserver_nodes, ",", &metaserver_nodes);
61 std::string server_addr = metaserver_nodes[FLAGS_node_index];
62 std::string listen_addr = std::string("0.0.0.0") + server_addr.substr(server_addr.rfind(':'));
63 if (!rpc_server.Start(listen_addr)) {
64 return EXIT_FAILURE;
65 }
68 rpc_server.Run();
然后在metaserver_impl.h文件中有些函數(shù)定義中使用的一些成員變量并沒有在相應(yīng)的類聲明中定義,通過metaserver.proto轉(zhuǎn)換后的pb.cc文件中:
54 service MetaServer {
55 rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
56 rpc SyncBlock(SyncBlockRequest) returns(SyncBlockResponse);
57 rpc FinishBlock(FinishBlockRequest) returns(FinishBlockResponse);
58 rpc RemoveBlock(RemoveBlockRequest) returns(RemoveBlockResponse);
59 }
1175 class MetaServer : public ::google::protobuf::Service {
1176 protected:
1177 // This class should be treated as an abstract interface.
1178 inline MetaServer() {};
1179 public:
1180 virtual ~MetaServer();
1186 virtual void AddBlock(::google::protobuf::RpcController* controller,
1187 const ::baidu::bfs::metaserver::AddBlockRequest* request,
1188 ::baidu::bfs::metaserver::AddBlockResponse* response,
1189 ::google::protobuf::Closure* done);
1190 virtual void SyncBlock(::google::protobuf::RpcController* controller,
1191 const ::baidu::bfs::metaserver::SyncBlockRequest* request,
1192 ::baidu::bfs::metaserver::SyncBlockResponse* response,
1193 ::google::protobuf::Closure* done);
1194 //more code...
42 class MetaServerImpl : public MetaServer {
43 public:
44 MetaServerImpl();
45 virtual ~MetaServerImpl();
79 private:
80 //more code...
90 private:
91 ThreadPool* read_thread_pool_;
92 ThreadPool* work_thread_pool_;
93 ThreadPool* report_thread_pool_;
94 ThreadPool* heartbeat_thread_pool_;
95 ChunkServerManager* chunkserver_manager_;
96 BlockMappingManager* block_mapping_manager_;
97
98 volatile bool readonly_;
99 volatile int recover_timeout_;
100 RecoverMode recover_mode_;
101 int64_t start_time_;
102 };
但在類似實現(xiàn)virtual rpc method的時候,有些問題:
88 void MetaServerImpl::HeartBeat(::google::protobuf::RpcController* controller,
89 const HeartBeatRequest* request,
90 HeartBeatResponse* response,
91 ::google::protobuf::Closure* done) {
92 if (!is_leader_) { //這里有is_leader???
93 response->set_status(kIsFollower);
94 done->Run();
95 return;
96 }
97 g_heart_beat.Inc();
99 int64_t version = request->namespace_version();
100 if (version == namespace_->Version()) {
101 chunkserver_manager_->HandleHeartBeat(request, response);
102 } else {
103 response->set_status(kVersionError);
104 }
105 response->set_namespace_version(namespace_->Version());
106 done->Run();
107 }
類似的這幾個函數(shù)同nameserver impl類中相同,但在后者的類聲明中有定義相應(yīng)的數(shù)據(jù)成員,而后者有is_leader_之類的是因為raft,而metaserver沒有這個功能,不知道這里是不是拷貝那邊的代碼?沒有用到的功能我這邊不會分析,比如上面說的。
下面正式分析寫流程,因為macbook上沒什么辦公用品,一些圖就不畫了。
寫流程(上)
在bfs_client.cc中,put命令是把本地文件寫到bfs中,如usage中那般put <localfile> <bfsfile> : copy file from local to bfs;部分代碼如下:
211 int BfsPut(baidu::bfs::FS* fs, int argc, char* argv[]) {
212 //check argv...
247 baidu::bfs::File* file;
248 if (fs->OpenFile(target.c_str(), O_WRONLY | O_TRUNC, st.st_mode, &file, baidu::bfs::WriteOptions()) != 0) {
249 //do error logic...
252 }
253 char buf[10240];
254 int64_t len = 0;
255 int32_t bytes = 0;
256 while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257 int32_t write_bytes = file->Write(buf, bytes);
258 if (write_bytes < bytes) {
260 ret = 2;
261 break;
262 }
263 len += bytes;
264 }
265 fclose(fp);
266 if (file->Close() != 0) {
269 }
270 //other code...
273 }
以上功能對輸入?yún)?shù)作些檢查,然后OpenFile,以同樣的st_mode值,如果文件存在則截斷為零,然后返回File對象,File實現(xiàn)邏輯復(fù)雜,后續(xù)分析;之后開始file->Write;最后file->Close。從這里分析出,貌似不支持?jǐn)帱c續(xù)傳???這個過程還是比較簡單的。
這里的寫有兩種方式,分別是kWriteChains和kWriteFanout,具體點“鏈?zhǔn)綄懹锌赡艽嬖诼?jié)點,一旦鏈中某個結(jié)點速度較慢,則會拖慢整個鏈的寫入速度,現(xiàn)在也支持扇出寫,同時寫四個副本,只要有三個成功就返回成功,可以去規(guī)避慢節(jié)點?!?bfs_qa),具體怎么實現(xiàn)后續(xù)在代碼中分析。
322 int32_t FSImpl::OpenFile(const char* path, int32_t flags, int32_t mode,
323 File** file, const WriteOptions& options) {
324 *file = NULL;
328 WriteOptions write_option = options;
329 //set options.write_mode
341 CreateFileRequest request;
342 CreateFileResponse response;
343 //set request params
348 bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::CreateFile,
349 &request, &response, 15, 1);
350 if (!rpc_ret || response.status() != kOK) {
353 if (!rpc_ret) {
354 return TIMEOUT;
355 } else {
356 return GetErrorCode(response.status());
357 }
358 } else {
359 *file = new FileImplWrapper(this, rpc_client_, path, flags, write_option);
360 }
361 return OK;
362 }
///proto
40 message CreateFileRequest {
41 optional int64 sequence_id = 1;////TODO 0
42 optional string file_name = 2;
43 optional int32 mode = 3;
44 optional int32 flags = 4;
45 optional int32 replica_num = 5;
46 optional string user = 7;
47 }
48
49 message CreateFileResponse {
50 optional int64 sequence_id = 1;
51 optional StatusCode status = 2;
52 }
以上功能是設(shè)置寫的模式,然后設(shè)置請求參數(shù),調(diào)用SendRequest請求哪個service的哪個rpc,帶上request和response,超時時間和重試次數(shù);返回成功后會返回個File對象,關(guān)于protobuf rpc這塊后期有時間分析下原理。
當(dāng)CreateFileRequest請求到達(dá)nameserver服務(wù)上時:
369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
370 const CreateFileRequest* request,
371 CreateFileResponse* response,
372 ::google::protobuf::Closure* done) {
373 if (!is_leader_) {
374 response->set_status(kIsFollower);
375 done->Run();
376 return;
377 }
378 g_create_file.Inc();
379 response->set_sequence_id(request->sequence_id());
380 std::string path = NameSpace::NormalizePath(request->file_name());
381 int flags = request->flags();
382 int mode = request->mode();
383 if (mode == 0) {
384 mode = 0644; // default mode
385 }
386 int replica_num = request->replica_num();///副本數(shù)量
387 NameServerLog log;
388 std::vector<int64_t> blocks_to_remove;
389 FileLockGuard file_lock(new WriteLock(path));//TODO 1
390 StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov e, &log);
391 //more code...
406 }
以上如果不是leader的話則返回給client并重定向到leader服務(wù)上;然后設(shè)置參數(shù),加鎖(實現(xiàn)后面分析)進行串行化;然后調(diào)用namespace_->CreateFile,后者是一些元數(shù)據(jù),用leveldb存儲:
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 if (file_name == "/") {
295 return kBadParameter;
296 }
297 FileInfo file_info;
298 std::string fname, info_value;
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 //more code...
338 }
以上會根據(jù)file_name進行split,由BuildPath處理;BuildPath主要做的事情是:對file_name處理每一級目錄,對每一層目錄名所在的深度進行大端編碼EncodingStoreKey(parent_id, name, &key_str),并在leveldb中查找是否存在,除了最后一個文件名:
254 StatusCode NameSpace::BuildPath(const std::string& path, FileInfo* file_info, std::string* fname ,
255 NameServerLog* log) {
256 std::vector<std::string> paths;
257 if (!common::util::SplitPath(path, &paths)) {
259 return kBadParameter;
260 }
262 /// Find parent directory, create if not exist.
263 int64_t parent_id = kRootEntryid;
264 int depth = paths.size();
265 /// if parent is root, set "file_info"
266 file_info->set_entry_id(kRootEntryid);
267 std::string info_value;
268 for (int i = 0; i < depth - 1; ++i) {
269 if (!LookUp(parent_id, paths[i], file_info)) {
270 file_info->set_type((1 << 9) | 01755);//drwxr-xr-x
271 file_info->set_ctime(time(NULL));
272 file_info->set_entry_id(common::atomic_add64(&last_entry_id_, 1) + 1);
273 file_info->SerializeToString(&info_value);
274 std::string key_str;
275 EncodingStoreKey(parent_id, paths[i], &key_str);
276 leveldb::Status s = db_->Put(leveldb::WriteOptions(), key_str, info_value);//元數(shù)據(jù)序列化到leveldb
277 assert(s.ok());
278 EncodeLog(log, kSyncWrite, key_str, info_value); //記錄日志
280 } else {
281 if (GetFileType(file_info->type()) != kDir) {//判斷是否為目錄結(jié)構(gòu)
283 return kBadParameter;
284 }
285 }
286 parent_id = file_info->entry_id();
287 }
288 *fname = paths[depth - 1]; //文件名
289 return kOK;
290 }
194 bool NameSpace::LookUp(int64_t parent_id, const std::string& name, FileInfo* info) {
195 std::string key_str;
196 EncodingStoreKey(parent_id, name, &key_str);
197 if (!GetFromStore(key_str, info)) {
199 return false;
200 }
202 return true;
203 }
135 bool NameSpace::GetFromStore(const std::string& key, FileInfo* info) {
136 std::string value;
137 leveldb::Status s = db_->Get(leveldb::ReadOptions(), key, &value);
138 if (!s.ok()) {
141 return false;
142 }
143 if (!info->ParseFromString(value)) {
145 return false;
146 }
147 return true;
148 }
其中last_entry_id_是在NameSpace::RebuildBlockMap過程中,從leveldb歷史文件元數(shù)據(jù)中的最大值;然后對每層string一個個處理,如果不存在則編碼并格式化,否則作些校驗,并更新parent_id;上面的過程其實是對path進行編碼;比如一開始leveldb為空的,然后path為/home/dirx/filex,則一共執(zhí)行2次,分別是home為parent_id =1,entry_id=2,dirx為parent_id =2,entry_id=3,filex為文件名(目錄)不處理[因為在client端時,會對source進行檢查,必須是xxx或xxx/xxx而不能是xxx/xxx/,而target可以為諸如xxx/xxxx/或xxx/xxxx];至于為什么要這么編碼,后續(xù)會分析。以上以1home為key,其他數(shù)據(jù)為value。
在NameSpace::CreateFile中,經(jīng)過NameSpace::BuildPath返回的file_info為文件名的上一級目錄;然后LookUp(parent_id, fname, &file_info)判斷文件(目錄)是否存在;
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 //more code...
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 if (status != kOK) {
301 return status;
302 }
303 int64_t parent_id = file_info.entry_id();
304 bool exist = LookUp(parent_id, fname, &file_info);
305
306 if (exist) {
307 if ((flags & O_TRUNC) == 0) {//對于存在的文件(目錄)但不截斷則返回錯誤
309 return kFileExists;
310 } else {
311 if (GetFileType(file_info.type()) == kDir) {
313 return kFileExists;//文件名是目錄則返回
314 }
315 for (int i = 0; i < file_info.blocks_size(); i++) {
316 blocks_to_remove->push_back(file_info.blocks(i));//即將截斷
317 }
318 }
319 }
320 //設(shè)置file_info數(shù)據(jù)并寫leveldb
其中blocks_to_remove處理的是要截斷的文件,需要刪除block元數(shù)據(jù);其中NameServerLog* log為key和value;
以上只是增加塊數(shù)據(jù)和刪除塊數(shù)據(jù),在leader服務(wù)上,接著:
391 for (size_t i = 0; i < blocks_to_remove.size(); i++) {
392 block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
393 }
刪除block的信息,至此,并未真正分配block,會在client寫數(shù)據(jù)時才分配,會在下篇分析。
LogRemote做的事情有些復(fù)雜,會在后面分析。這里簡單說一下,這里會判斷是否是ha功能,如果不是則作為異步task等待工作線程執(zhí)行;否則序列化NameServerLog并記錄到raft中,后續(xù)會回調(diào)SyncLogCallback:
408 bool NameServerImpl::LogRemote(const NameServerLog& log,
409 std::function<void (bool)> callback) {
410 if (sync_ == NULL) {
411 if (callback) {
412 sync_callback_thread_pool_->AddTask(std::bind(callback, true));
413 }
414 return true;
415 }
416 std::string logstr;
417 if (!log.SerializeToString(&logstr)) {
419 }
420 if (callback) {
421 sync_->Log(logstr, callback);
422 return true;
423 } else {
424 return sync_->Log(logstr, FLAGS_log_replicate_timeout * 1000);
425 }
426 }
42 void RaftImpl::Log(const std::string& entry, std::function<void (bool)> callback) {
43 raft_node_->AppendLog(entry, callback);
44 }
450 void RaftNodeImpl::AppendLog(const std::string& log, std::function<void (bool)> callback) {
451 MutexLock lock(&mu_);
452 int64_t index = log_index_ + 1;
453 ///TODO: optimize lock
454 if (!StoreLog(current_term_, index, log)) {
455 thread_pool_->AddTask(std::bind(callback,false));
456 return;
457 }
458 callback_map_.insert(std::make_pair(index, callback));
459 log_index_ ++;
460 for (uint32_t i = 0; i < nodes_.size(); i++) {
461 if (follower_context_[i]) {
462 follower_context_[i]->condition.Signal();
463 }
464 }
465 }
在某個時候:
367 while (last_applied_ < commit_index) {
368 last_applied_ ++;
370 std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
371 callback_map_.find(last_applied_);
372 if (cb_it != callback_map_.end()) {
373 std::function<void (bool)> callback = cb_it->second;
374 callback_map_.erase(cb_it);
375 mu_.Unlock();
377 callback(true);//執(zhí)行回調(diào)
378 mu_.Lock();
379 } else {
381 }
TODO
某些具體的技術(shù)實現(xiàn)在上面分析時跳過了,比如為什么數(shù)據(jù)結(jié)構(gòu)這么設(shè)計,鎖的實現(xiàn),還有raft的實現(xiàn)及l(fā)og等后續(xù)補上,下一篇的分析會補上BlockMappingManager/FileLockManager這兩個;還有一些異常處理也沒仔細(xì)考慮。