RocketMQ原始碼之DLedger儲存實現DLedgerMmapFileStore

一、前言

前文我們分析了 RocketMQ原始碼之broker高可用CommitLog管理元件DLedgerCommitLog,本文我們分析DLedgerCommitLog中的mmap記憶體對映檔案儲存元件:DLedgerMmapFileStore;

二、原始碼分析

DLedgerMmapFileStore抽象父類DLedgerStore;

DLedgerMmapFileStore成員變數;

DLedgerMmapFileStore建構函式;

flush資料服務執行緒;

清理空間服務執行緒;

檔案記憶體對映儲存實現啟動;

追加資料;

對資料檔案進行截斷;

以follower身份追加資料;

根據索引去查詢一個數據條目;

資料儲存格式及編碼;

1、DLedgerMmapFileStore抽象父類DLedgerStore

// 儲存元件抽象類public abstract class DLedgerStore { // 獲取到當前server的成員狀態 public MemberState getMemberState() { return null; } // 作為leader把一個entry追加到磁盤裡去 public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry); /** * 作為follower把一個entry追加到磁盤裡去 * 再追加的時候,我是需要知道是哪個leader同步了這個entry給我的 * @param entry 日誌條目 * @param leaderTerm leader選舉週期 * @param leaderId leaderid * @return */ public abstract DLedgerEntry appendAsFollower( DLedgerEntry entry, long leaderTerm, String leaderId); // 根據索引獲取日誌entry public abstract DLedgerEntry get(Long index); // 獲取已經提交的index索引 public abstract long getCommittedIndex(); // 在一輪term裡更新已經提交index索引 public void updateCommittedIndex(long term, long committedIndex) { } // 獲取結尾term public abstract long getLedgerEndTerm(); // 獲取結尾index public abstract long getLedgerEndIndex(); // 獲取開始index public abstract long getLedgerBeginIndex(); // 更新結尾index和term protected void updateLedgerEndIndexAndTerm() { if (getMemberState() != null) { getMemberState()。updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm()); } } // 在儲存元件裡發起一次flush操作 public void flush() { } // 對指定的leader term和id發起一個entry的截斷,truncate public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { return -1; } // 儲存元件啟動 public void startup() { } // 儲存元件停止 public void shutdown() { }}

2、DLedgerMmapFileStore成員變數

// 基於mmap記憶體對映檔案的儲存元件實現,是我們需要重點研究的public class DLedgerMmapFileStore extends DLedgerStore { public static final String CHECK_POINT_FILE = “checkpoint”; public static final String END_INDEX_KEY = “endIndex”; public static final String COMMITTED_INDEX_KEY = “committedIndex”; public static final int MAGIC_1 = 1; public static final int CURRENT_MAGIC = MAGIC_1; public static final int INDEX_UNIT_SIZE = 32; private static Logger logger = LoggerFactory。getLogger(DLedgerMmapFileStore。class); // 追加entries鉤子 public List appendHooks = new ArrayList<>(); // 開始index索引 private long ledgerBeginIndex = -1; // 結束index索引 private long ledgerEndIndex = -1; // 已經提交的index索引 private long committedIndex = -1; // 已經提交的pos位置 private long committedPos = -1; // 結束term條目 private long ledgerEndTerm; // dledger核心配置元件 private DLedgerConfig dLedgerConfig; // server節點成員狀態 private MemberState memberState; // mmap記憶體對映資料檔案list private MmapFileList dataFileList; // mmap記憶體對映索引檔案list private MmapFileList indexFileList; // 執行緒本地副本里面的entry緩衝元件 private ThreadLocal localEntryBuffer; // 執行緒本地副本里面的index緩衝元件 private ThreadLocal localIndexBuffer; // flush資料服務元件 private FlushDataService flushDataService; // 清理空間服務元件 private CleanSpaceService cleanSpaceService; // 磁碟是否已經滿了標識 private volatile boolean isDiskFull = false; // 最近一次檢查點時間戳 private long lastCheckPointTimeMs = System。currentTimeMillis(); // 是否已經載入過的boolean標識 private AtomicBoolean hasLoaded = new AtomicBoolean(false); // 是否已經完成恢復的boolean標識 private AtomicBoolean hasRecovered = new AtomicBoolean(false); // 完整儲存路徑set private volatile Set fullStorePaths = Collections。emptySet();}

3、DLedgerMmapFileStore建構函式

public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) { // 賦值一個dledger配置元件 this。dLedgerConfig = dLedgerConfig; // 賦值一個server成員狀態元件 this。memberState = memberState; // 我們的dledger資料儲存路徑裡面如果說包含有多路徑分隔符 if (dLedgerConfig。getDataStorePath()。contains(DLedgerConfig。MULTI_PATH_SPLITTER)) { // 把資料檔案list封裝為一個多路徑mmap記憶體對映檔案list this。dataFileList = new MultiPathMmapFileList( dLedgerConfig, dLedgerConfig。getMappedFileSizeForEntryData(), this::getFullStorePaths ); } else { // 把資料檔案list封裝為一個mmap檔案list this。dataFileList = new MmapFileList( dLedgerConfig。getDataStorePath(), dLedgerConfig。getMappedFileSizeForEntryData() ); } // 把索引檔案list封裝成一個mmap記憶體對映檔案list this。indexFileList = new MmapFileList( dLedgerConfig。getIndexStorePath(), dLedgerConfig。getMappedFileSizeForEntryIndex() ); // 執行緒副本的記憶體分配,localentry的記憶體分配,4mb localEntryBuffer = ThreadLocal。withInitial(() -> ByteBuffer。allocate(4 * 1024 * 1024)); // 執行緒副本的記憶體分配,localindex的記憶體分配,一個索引單元大小(32個位元組)* 2 localIndexBuffer = ThreadLocal。withInitial(() -> ByteBuffer。allocate(INDEX_UNIT_SIZE * 2)); // 構建從記憶體裡flush資料到磁碟檔案裡的服務元件 flushDataService = new FlushDataService(“DLedgerFlushDataService”, logger); // 構建清理空間服務元件 cleanSpaceService = new CleanSpaceService(“DLedgerCleanSpaceService”, logger);}

4、flush資料服務執行緒分析

實際flush動作會呼叫MmapFileList#flush方法,RocketMQ原始碼分析之檔案記憶體對映物件層MappedFile核心方法 分析過,不再贅敘;

// flush資料服務class FlushDataService extends ShutdownAbleThread { public FlushDataService(String name, Logger logger) { super(name, logger); } // 他會不斷的週期性的執行,但是支援關閉他 @Override public void doWork() { try { long start = System。currentTimeMillis(); // 他會週期性的去觸發資料檔案的flush動作 DLedgerMmapFileStore。this。dataFileList。flush(0); // 他會週期性的去觸發索引檔案的flush動作 DLedgerMmapFileStore。this。indexFileList。flush(0); long elapsed; if ((elapsed = DLedgerUtils。elapsed(start)) > 500) { logger。info(“Flush data cost={} ms”, elapsed); } // 如果說超過了一個檢查點時間間隔,還需要去發起一次檢查點持久化 if (DLedgerUtils。elapsed(lastCheckPointTimeMs) > dLedgerConfig。getCheckPointInterval()) { persistCheckPoint(); lastCheckPointTimeMs = System。currentTimeMillis(); } // 休眠flush間隔時間 waitForRunning(dLedgerConfig。getFlushFileInterval()); } catch (Throwable t) { logger。info(“Error in {}”, getName(), t); DLedgerUtils。sleep(200); } }}

5、清理空間服務執行緒

// 清理空間服務執行緒class CleanSpaceService extends ShutdownAbleThread { // 獲取到磁碟空間已經使用比例 double storeBaseRatio = DLedgerUtils。getDiskPartitionSpaceUsedPercent( dLedgerConfig。getStoreBaseDir() ); // 資料儲存路徑裡物理佔用比例 double dataRatio = calcDataStorePathPhysicRatio(); public CleanSpaceService(String name, Logger logger) { super(name, logger); } @Override public void doWork() { try { storeBaseRatio = DLedgerUtils。getDiskPartitionSpaceUsedPercent(dLedgerConfig。getStoreBaseDir()); dataRatio = calcDataStorePathPhysicRatio(); long hourOfMs = 3600L * 1000L; long fileReservedTimeMs = dLedgerConfig。getFileReservedHours() * hourOfMs; if (fileReservedTimeMs < hourOfMs) { logger。warn(“The fileReservedTimeMs={} is smaller than hourOfMs={}”, fileReservedTimeMs, hourOfMs); fileReservedTimeMs = hourOfMs; } //If the disk is full, should prevent more data to get in DLedgerMmapFileStore。this。isDiskFull = isNeedForbiddenWrite(); boolean timeUp = isTimeToDelete(); boolean checkExpired = isNeedCheckExpired(); boolean forceClean = isNeedForceClean(); boolean enableForceClean = dLedgerConfig。isEnableDiskForceClean(); int intervalForcibly = 120 * 1000; if (timeUp || checkExpired) { int count = getDataFileList()。deleteExpiredFileByTime( fileReservedTimeMs, 100, intervalForcibly, forceClean && enableForceClean ); if (count > 0 || (forceClean && enableForceClean) || isDiskFull) { logger。info(“Clean space count={} timeUp={} checkExpired={} forceClean={} enableForceClean={} diskFull={} storeBaseRatio={} dataRatio={}”, count, timeUp, checkExpired, forceClean, enableForceClean, isDiskFull, storeBaseRatio, dataRatio); } if (count > 0) { DLedgerMmapFileStore。this。reviseLedgerBeginIndex(); } } getDataFileList()。retryDeleteFirstFile(intervalForcibly); waitForRunning(100); } catch (Throwable t) { logger。info(“Error in {}”, getName(), t); DLedgerUtils。sleep(200); } } private boolean isTimeToDelete() { String when = DLedgerMmapFileStore。this。dLedgerConfig。getDeleteWhen(); if (DLedgerUtils。isItTimeToDo(when)) { return true; } return false; } private boolean isNeedCheckExpired() { if (storeBaseRatio > dLedgerConfig。getDiskSpaceRatioToCheckExpired() || dataRatio > dLedgerConfig。getDiskSpaceRatioToCheckExpired()) { return true; } return false; } private boolean isNeedForceClean() { if (storeBaseRatio > dLedgerConfig。getDiskSpaceRatioToForceClean() || dataRatio > dLedgerConfig。getDiskSpaceRatioToForceClean()) { return true; } return false; } private boolean isNeedForbiddenWrite() { if (storeBaseRatio > dLedgerConfig。getDiskFullRatio() || dataRatio > dLedgerConfig。getDiskFullRatio()) { return true; } return false; } // 計算資料儲存路徑物理比例 public double calcDataStorePathPhysicRatio() { // Set fullStorePath = new HashSet<>(); String storePath = dLedgerConfig。getDataStorePath(); String[] paths = storePath。trim()。split(DLedgerConfig。MULTI_PATH_SPLITTER); double minPhysicRatio = 100; // 遍歷每一個path路徑 for (String path : paths) { double physicRatio = DLedgerUtils。isPathExists(path) ? DLedgerUtils。getDiskPartitionSpaceUsedPercent(path) : -1; minPhysicRatio = Math。min(minPhysicRatio, physicRatio); if (physicRatio > dLedgerConfig。getDiskSpaceRatioToForceClean()) { fullStorePath。add(path); } } DLedgerMmapFileStore。this。setFullStorePaths(fullStorePath); return minPhysicRatio; }}

6、檔案記憶體對映儲存實現啟動

// 對儲存元件可以去執行startup啟動函式@Overridepublic void startup() { // 資料檔案和索引檔案載入 load(); // 資料恢復 recover(); // flush資料服務元件啟動 flushDataService。start(); // 清理空間服務元件 cleanSpaceService。start();}

public void load() { if (!hasLoaded。compareAndSet(false, true)) { return; } // mmap記憶體對映資料檔案載入和mmap記憶體對映索引檔案載入 if (!this。dataFileList。load() || !this。indexFileList。load()) { logger。error(“Load file failed, this usually indicates fatal error, you should check it manually”); System。exit(-1); }}

7、追加資料

// 我們可以把一條資料entry追加到我們的儲存元件裡來@Overridepublic DLedgerEntry appendAsLeader(DLedgerEntry entry) { // 當前節點的狀態是否是Leader,如果不是,則丟擲異常 PreConditions。check(memberState。isLeader(), DLedgerResponseCode。NOT_LEADER); // 當前磁碟是否已滿,其判斷依據是DLedger的根目錄或資料檔案目錄的使用率超過了允許使用的最大值,預設值為85% PreConditions。check(!isDiskFull, DLedgerResponseCode。DISK_FULL); // 從執行緒本地副本里獲取到一個自己當前執行緒的資料緩衝區和索引緩衝區 ByteBuffer dataBuffer = localEntryBuffer。get(); ByteBuffer indexBuffer = localIndexBuffer。get(); // 把entry資料編碼到資料緩衝區裡去 DLedgerEntryCoder。encode(entry, dataBuffer); // 透過資料緩衝區裡面的remaining可以獲取到entry大小 int entrySize = dataBuffer。remaining(); // 對server成員狀態加鎖 synchronized (memberState) { PreConditions。check(memberState。isLeader(), DLedgerResponseCode。NOT_LEADER, null); PreConditions。check(memberState。getTransferee() == null, DLedgerResponseCode。LEADER_TRANSFERRING, null); // 所以說endindex+1了以後,從-1到0,隨著追加資料累加的索引值 long nextIndex = ledgerEndIndex + 1; // 設定一下索引值 entry。setIndex(nextIndex); // 透過server成員狀態獲取到term第幾輪 entry。setTerm(memberState。currTerm()); // 設定魔數 entry。setMagic(CURRENT_MAGIC); // 把累加索引、當前term、魔數,寫入到了資料緩衝區裡去 DLedgerEntryCoder。setIndexTerm( dataBuffer, nextIndex, memberState。currTerm(), CURRENT_MAGIC ); // 我準備把這條資料預追加到我們的資料檔案mmapfiles裡去 long prePos = dataFileList。preAppend(dataBuffer。remaining()); entry。setPos(prePos); PreConditions。check(prePos != -1, DLedgerResponseCode。DISK_ERROR, null); DLedgerEntryCoder。setPos(dataBuffer, prePos); // 在正式寫入資料之前可以回撥我們的追加hook鉤子 for (AppendHook writeHook : appendHooks) { writeHook。doHook(entry, dataBuffer。slice(), DLedgerEntry。BODY_OFFSET); } // 資料檔案mmapfiles追加對應的資料 long dataPos = dataFileList。append( dataBuffer。array(), 0, dataBuffer。remaining() ); PreConditions。check(dataPos != -1, DLedgerResponseCode。DISK_ERROR, null); PreConditions。check(dataPos == prePos, DLedgerResponseCode。DISK_ERROR, null); // 關於dledger索引追加寫入 DLedgerEntryCoder。encodeIndex( dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState。currTerm(), indexBuffer ); // 索引檔案mmapfiles追加一條索引進檔案 long indexPos = indexFileList。append( indexBuffer。array(), 0, indexBuffer。remaining(), false ); PreConditions。check(indexPos == entry。getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode。DISK_ERROR, null); if (logger。isDebugEnabled()) { logger。info(“[{}] Append as Leader {} {}”, memberState。getSelfId(), entry。getIndex(), entry。getBody()。length); } // 每次追加一條資料寫入,寫入完了以後endIndex就會累加 ledgerEndIndex++; ledgerEndTerm = memberState。currTerm(); // 拿到成員狀態的當前term if (ledgerBeginIndex == -1) { ledgerBeginIndex = ledgerEndIndex; } updateLedgerEndIndexAndTerm(); return entry; }}

8、對資料檔案進行截斷

// 對資料檔案做一個截斷,有一部分資料就直接不要了@Overridepublic long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions。check(memberState。isFollower(), DLedgerResponseCode。NOT_FOLLOWER, null); // 獲取到執行緒本地副本里的資料緩衝區和索引緩衝區 ByteBuffer dataBuffer = localEntryBuffer。get(); ByteBuffer indexBuffer = localIndexBuffer。get(); DLedgerEntryCoder。encode(entry, dataBuffer); int entrySize = dataBuffer。remaining(); synchronized (memberState) { PreConditions。check(memberState。isFollower(), DLedgerResponseCode。NOT_FOLLOWER, “role=%s”, memberState。getRole()); PreConditions。check(leaderTerm == memberState。currTerm(), DLedgerResponseCode。INCONSISTENT_TERM, “term %d != %d”, leaderTerm, memberState。currTerm()); PreConditions。check(leaderId。equals(memberState。getLeaderId()), DLedgerResponseCode。INCONSISTENT_LEADER, “leaderId %s != %s”, leaderId, memberState。getLeaderId()); // 直接去根據索引讀取一條資料出來 boolean existedEntry; try { DLedgerEntry tmp = get(entry。getIndex()); existedEntry = entry。equals(tmp); } catch (Throwable ignored) { existedEntry = false; } long truncatePos = existedEntry ? entry。getPos() + entry。getSize() : entry。getPos(); if (truncatePos != dataFileList。getMaxWrotePosition()) { logger。warn(“[TRUNCATE]leaderId={} index={} truncatePos={} != maxPos={}, this is usually happened on the old leader”, leaderId, entry。getIndex(), truncatePos, dataFileList。getMaxWrotePosition()); } // 對這個位置開始的資料發起一個截斷 dataFileList。truncateOffset(truncatePos); if (dataFileList。getMaxWrotePosition() != truncatePos) { logger。warn(“[TRUNCATE] rebuild for data wrotePos: {} != truncatePos: {}”, dataFileList。getMaxWrotePosition(), truncatePos); PreConditions。check(dataFileList。rebuildWithPos(truncatePos), DLedgerResponseCode。DISK_ERROR, “rebuild data truncatePos=%d”, truncatePos); } // 修訂資料檔案mmapfiles的已經flush位置 reviseDataFileListFlushedWhere(truncatePos); if (!existedEntry) { long dataPos = dataFileList。append(dataBuffer。array(), 0, dataBuffer。remaining()); PreConditions。check(dataPos == entry。getPos(), DLedgerResponseCode。DISK_ERROR, “ %d != %d”, dataPos, entry。getPos()); } // 資料檔案做了一個截斷,索引檔案也需要做一個截斷 long truncateIndexOffset = entry。getIndex() * INDEX_UNIT_SIZE; indexFileList。truncateOffset(truncateIndexOffset); if (indexFileList。getMaxWrotePosition() != truncateIndexOffset) { logger。warn(“[TRUNCATE] rebuild for index wrotePos: {} != truncatePos: {}”, indexFileList。getMaxWrotePosition(), truncateIndexOffset); PreConditions。check(indexFileList。rebuildWithPos(truncateIndexOffset), DLedgerResponseCode。DISK_ERROR, “rebuild index truncatePos=%d”, truncateIndexOffset); } reviseIndexFileListFlushedWhere(truncateIndexOffset); DLedgerEntryCoder。encodeIndex(entry。getPos(), entrySize, entry。getMagic(), entry。getIndex(), entry。getTerm(), indexBuffer); long indexPos = indexFileList。append(indexBuffer。array(), 0, indexBuffer。remaining(), false); PreConditions。check(indexPos == entry。getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode。DISK_ERROR, null); ledgerEndTerm = entry。getTerm(); ledgerEndIndex = entry。getIndex(); reviseLedgerBeginIndex(); updateLedgerEndIndexAndTerm(); return entry。getIndex(); }}

9、以follower身份追加資料

// 以follower身份追加資料@Overridepublic DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions。check(memberState。isFollower(), DLedgerResponseCode。NOT_FOLLOWER, “role=%s”, memberState。getRole()); PreConditions。check(!isDiskFull, DLedgerResponseCode。DISK_FULL); // 資料緩衝區和索引緩衝區 ByteBuffer dataBuffer = localEntryBuffer。get(); ByteBuffer indexBuffer = localIndexBuffer。get(); DLedgerEntryCoder。encode(entry, dataBuffer); int entrySize = dataBuffer。remaining(); synchronized (memberState) { PreConditions。check(memberState。isFollower(), DLedgerResponseCode。NOT_FOLLOWER, “role=%s”, memberState。getRole()); long nextIndex = ledgerEndIndex + 1; PreConditions。check(nextIndex == entry。getIndex(), DLedgerResponseCode。INCONSISTENT_INDEX, null); PreConditions。check(leaderTerm == memberState。currTerm(), DLedgerResponseCode。INCONSISTENT_TERM, null); PreConditions。check(leaderId。equals(memberState。getLeaderId()), DLedgerResponseCode。INCONSISTENT_LEADER, null); // 在指定位置裡追加資料進去 long dataPos = dataFileList。append( dataBuffer。array(), 0, dataBuffer。remaining() ); PreConditions。check(dataPos == entry。getPos(), DLedgerResponseCode。DISK_ERROR, “%d != %d”, dataPos, entry。getPos()); DLedgerEntryCoder。encodeIndex( dataPos, entrySize, entry。getMagic(), entry。getIndex(), entry。getTerm(), indexBuffer ); // 追加索引資料 long indexPos = indexFileList。append( indexBuffer。array(), 0, indexBuffer。remaining(), false ); PreConditions。check(indexPos == entry。getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode。DISK_ERROR, null); ledgerEndTerm = entry。getTerm(); ledgerEndIndex = entry。getIndex(); if (ledgerBeginIndex == -1) { ledgerBeginIndex = ledgerEndIndex; } // 更新結尾index和term updateLedgerEndIndexAndTerm(); return entry; }}

10、根據索引去查詢一個數據條目

// 根據索引去查詢一個數據條目@Overridepublic DLedgerEntry get(Long index) { indexCheck(index); // 定義好索引記憶體緩衝片段和資料記憶體緩衝片段 SelectMmapBufferResult indexSbr = null; SelectMmapBufferResult dataSbr = null; try { // 直接透過索引檔案mmapfiles去查詢資料 // index本身其實第幾條索引,每一條索引可以是一個單元是有自己大小,所以定位索引的偏移量 // index*unit_size,從那個位置開始讀取unit_size大小的一條資料 indexSbr = indexFileList。getData( index * INDEX_UNIT_SIZE, INDEX_UNIT_SIZE ); PreConditions。check(indexSbr != null && indexSbr。getByteBuffer() != null, DLedgerResponseCode。DISK_ERROR, “Get null index for %d”, index); indexSbr。getByteBuffer()。getInt(); //magic long pos = indexSbr。getByteBuffer()。getLong(); int size = indexSbr。getByteBuffer()。getInt(); // 根據資料位置和大小,再次從資料檔案mmapfiles裡面讀取出來一條資料就可以了 dataSbr = dataFileList。getData(pos, size); PreConditions。check(dataSbr != null && dataSbr。getByteBuffer() != null, DLedgerResponseCode。DISK_ERROR, “Get null data for %d”, index); // 把這個資料做一個解碼 DLedgerEntry dLedgerEntry = DLedgerEntryCoder。decode(dataSbr。getByteBuffer()); PreConditions。check(pos == dLedgerEntry。getPos(), DLedgerResponseCode。DISK_ERROR, “%d != %d”, pos, dLedgerEntry。getPos()); return dLedgerEntry; } finally { // 把之前讀取的索引和資料的緩衝片段做一個釋放 SelectMmapBufferResult。release(indexSbr); SelectMmapBufferResult。release(dataSbr); }}

11、資料儲存格式及編碼

日誌條目

RocketMQ原始碼之DLedger儲存實現DLedgerMmapFileStore

DLedgerEntryCoder#encode()

/** * 編碼 * @param entry 日誌條目 * @param byteBuffer 緩衝區 */public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) { byteBuffer。clear(); int size = entry。computeSizeInBytes(); //always put magic on the first position // 魔數,4 位元組 byteBuffer。putInt(entry。getMagic()); // 條目總長度,包含Header(協議頭) + 訊息體,佔4 位元組 byteBuffer。putInt(size); // 當前條目的index,佔8 位元組 byteBuffer。putLong(entry。getIndex()); // 當前條目所屬的投票輪次,佔8 位元組 byteBuffer。putLong(entry。getTerm()); // 該條目的物理偏移量,類似於commitlog 檔案的物理偏移量,佔8 位元組 byteBuffer。putLong(entry。getPos()); // 保留欄位,當前版本未使用,佔4 位元組 byteBuffer。putInt(entry。getChannel()); // 當前版本未使用,佔4 位元組 byteBuffer。putInt(entry。getChainCrc()); // body 的CRC 校驗和,用來區分資料是否損壞,佔4 位元組。 byteBuffer。putInt(entry。getBodyCrc()); // 用來儲存body 的長度,佔4 個位元組。 byteBuffer。putInt(entry。getBody()。length); // 具體訊息的內容。 byteBuffer。put(entry。getBody()); byteBuffer。flip();}

日誌索引

RocketMQ原始碼之DLedger儲存實現DLedgerMmapFileStore

DLedgerEntryCoder#encodeIndex()

/** * 日誌索引編碼 * @param pos 日誌條目在檔案的偏移量 * @param size 條目大小 * @param magic 魔數 * @param index 索引 * @param term 投票輪次 * @param byteBuffer 緩衝區 */public static void encodeIndex(long pos, int size, int magic, long index, long term, ByteBuffer byteBuffer) { byteBuffer。clear(); // 魔數,4 位元組 byteBuffer。putInt(magic); // 日誌條目在檔案的偏移量,8位元組 byteBuffer。putLong(pos); // 條目大小,4位元組 byteBuffer。putInt(size); // 日誌條目索引,8位元組 byteBuffer。putLong(index); // 投票輪次,8位元組 byteBuffer。putLong(term); byteBuffer。flip();}