前言
因?yàn)槲覀兂S玫腞xjava,所以這里會(huì)結(jié)合RxRoom做分析,所以需要你有Rxjava相關(guān)的知識(shí)儲(chǔ)備。
完整源碼參見googlesamples,可以自己跑一下,體驗(yàn)一下。
簡(jiǎn)單用法
我在閱讀一份代碼時(shí)最喜歡的入手點(diǎn)就是先看怎么用,從用法去往前推導(dǎo)。
看下面的簡(jiǎn)單代碼,也就是所謂的三大組件。
//首先是實(shí)體類對(duì)應(yīng)到數(shù)據(jù)庫(kù)也就是一個(gè)表
@Entity(tableName = "users")
public class User {
@PrimaryKey
@ColumnInfo(name = "userid")
private String mId;
@ColumnInfo(name = "username")
private String mUserName;
...
}
//數(shù)據(jù)操作類
@Dao
public interface UserDao {
@Query("SELECT * FROM users LIMIT 1")
Flowable<User> getUser();
@Insert(onConflict = OnConflictStrategy.REPLACE)
void insertUser(User user);
...
}
//Database
@Database(
entities = {User.class,UserGroup.class},
version = 1)
public abstract class UsersDatabase extends RoomDatabase {
private static volatile UsersDatabase INSTANCE;
public abstract UserDao userDao();
public static UsersDatabase getInstance(Context context) {
if (INSTANCE == null) {
synchronized (UsersDatabase.class) {
if (INSTANCE == null) {
INSTANCE =
Room.databaseBuilder(
context.getApplicationContext(), UsersDatabase.class, "Sample.db")
.build();
}
}
}
return INSTANCE;
}
}
定義好上面的3個(gè)東西,我們就能直接用來增刪改查了。
val database = UsersDatabase.getInstance(this)
//插入一條數(shù)據(jù)
val user = User("kingty")
database.userDao().insertUser(user)
//監(jiān)聽一個(gè)查詢的實(shí)時(shí)變化
database.userDao().getUser().subscribe {
Log.d("TAG", "table changed => " + it.userName + " => " + it.id)
}
用法非常簡(jiǎn)單,做一些注解,它就自動(dòng)幫你完成了DAO中的所有操作,并且還可以監(jiān)聽數(shù)據(jù)庫(kù)的變化實(shí)時(shí)更新數(shù)據(jù)。這一切看起來比較夢(mèng)幻。那么問題來了,它是怎么做到這一切的?
怎么通過注解就可以操作數(shù)據(jù)庫(kù)了?
其實(shí)ORM的本質(zhì)就是用一些手段把你的數(shù)據(jù)類和操作變成SQL語(yǔ)句而已。而這一切無非就是兩種手段,編譯時(shí)做(apt)或者運(yùn)行時(shí)做(reflect)。出于效率考慮現(xiàn)在大部分都是編譯時(shí)做。編譯以下,所以我們來翻一翻build目錄,看一下Room給我們生成了一些什么東西。
- build
- generate
- source
- apt
- com.kingty.roomtest
- UserDao_Impl.java
- UsersDatabase_Impl.java
- com.kingty.roomtest
- apt
- source
- generate
就發(fā)現(xiàn)在上面這個(gè)目錄下生成了兩個(gè)類,這里我們不深究這里是怎么生成這兩個(gè)類的,說起來可以說兩天兩夜。其實(shí)是我也不懂。有興趣的同學(xué)應(yīng)該早就知道了,沒興趣的我說不說也無所謂了??傊褪?,編譯的時(shí)候通過我們剛才那些個(gè)注解給我們生成了真正的實(shí)現(xiàn)類,幫助我們完成了我們想要的操作。
來讓我們看一下里面都生成了一些什么?
先看一下UserDao_Impl.java這個(gè)類做了什么?
x@SuppressWarnings("unchecked")
public final class UserDao_Impl implements UserDao {
private final RoomDatabase __db;
private final EntityInsertionAdapter __insertionAdapterOfUser;
private final SharedSQLiteStatement __preparedStmtOfDeleteAllUsers;
public UserDao_Impl(RoomDatabase __db) {
this.__db = __db;
this.__insertionAdapterOfUser = new EntityInsertionAdapter<User>(__db) {
@Override
public String createQuery() {
return "INSERT OR REPLACE INTO `users`(`userid`,`username`) VALUES (?,?)";
}
@Override
public void bind(SupportSQLiteStatement stmt, User value) {
if (value.getId() == null) {
stmt.bindNull(1);
} else {
stmt.bindString(1, value.getId());
}
if (value.getUserName() == null) {
stmt.bindNull(2);
} else {
stmt.bindString(2, value.getUserName());
}
}
};
this.__preparedStmtOfDeleteAllUsers = new SharedSQLiteStatement(__db) {
@Override
public String createQuery() {
final String _query = "DELETE FROM Users";
return _query;
}
};
}
@Override
public void insertUser(User user) {
__db.beginTransaction();
try {
__insertionAdapterOfUser.insert(user);
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
}
}
@Override
public void deleteAllUsers() {
final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAllUsers.acquire();
__db.beginTransaction();
try {
_stmt.executeUpdateDelete();
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
__preparedStmtOfDeleteAllUsers.release(_stmt);
}
}
@Override
public Flowable<User> getUser() {
final String _sql = "SELECT * FROM users LIMIT 1";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return RxRoom.createFlowable(__db, new String[]{"users"}, new Callable<User>() {
@Override
public User call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
final int _cursorIndexOfMId = _cursor.getColumnIndexOrThrow("userid");
final int _cursorIndexOfMUserName = _cursor.getColumnIndexOrThrow("username");
final User _result;
if(_cursor.moveToFirst()) {
final String _tmpMId;
_tmpMId = _cursor.getString(_cursorIndexOfMId);
final String _tmpMUserName;
_tmpMUserName = _cursor.getString(_cursorIndexOfMUserName);
_result = new User(_tmpMId,_tmpMUserName);
} else {
_result = null;
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
}
}
首先,在初始化UserDao_Impl的時(shí)候通過EntityInsertionAdapter幫你創(chuàng)建了插入數(shù)據(jù)的adapter,里面有插入數(shù)據(jù)的sql模板,和綁定數(shù)據(jù)的方法。通俗一點(diǎn)說就是在這里幫你拼好了插入數(shù)據(jù)的SQL語(yǔ)句。接下來就是幫你實(shí)現(xiàn)了你在UserDao中定義的接口。
增刪改的套路大概類似,都是__db.beginTransaction();然后執(zhí)行拼好的SQL語(yǔ)句,然后__db.endTransaction();注意此DB非彼DB,這個(gè)DB是封裝過的RoomDatabase。后面我們會(huì)著重講一下這個(gè)beginTransaction和endTransaction,他們還是比較重要的一個(gè)環(huán)節(jié)。 重要環(huán)節(jié)一。
查的套路就不一樣了,為什么它不一樣,public Flowable<User> getUser()這個(gè)方法明顯看起來大坨一些,這就是它為什么不一樣。簡(jiǎn)單閱讀一下,它其實(shí)利用RxRoom創(chuàng)建了一個(gè)Flowable,其中有3個(gè)參數(shù)我們注意一下,第一個(gè)是__db也就是database,第二個(gè)是new String[]{"users"},是一個(gè)表名的數(shù)組,第3個(gè)就是查詢完成之后組裝成User的回調(diào)。特別注意的是第二個(gè)參數(shù),這個(gè)表名的數(shù)組的作用。這是重要環(huán)節(jié)二。
再看看UsersDatabase_Impl.java這個(gè)類的生成了什么?
@SuppressWarnings("unchecked")
public final class UsersDatabase_Impl extends UsersDatabase {
private volatile UserDao _userDao;
@Override
protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
@Override
public void createAllTables(SupportSQLiteDatabase _db) {
_db.execSQL("CREATE TABLE IF NOT EXISTS `users` (`userid` TEXT NOT NULL, `username` TEXT, PRIMARY KEY(`userid`))");
_db.execSQL("CREATE TABLE IF NOT EXISTS `usergroups` (`userGroupId` TEXT NOT NULL, `groupName` TEXT, PRIMARY KEY(`userGroupId`))");
_db.execSQL("CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)");
_db.execSQL("INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, \"8890a9730e4846f27da03382221fc877\")");
}
@Override
public void dropAllTables(SupportSQLiteDatabase _db) {
_db.execSQL("DROP TABLE IF EXISTS `users`");
_db.execSQL("DROP TABLE IF EXISTS `usergroups`");
}
@Override
protected void onCreate(SupportSQLiteDatabase _db) {
if (mCallbacks != null) {
for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) {
mCallbacks.get(_i).onCreate(_db);
}
}
}
@Override
public void onOpen(SupportSQLiteDatabase _db) {
mDatabase = _db;
internalInitInvalidationTracker(_db);
if (mCallbacks != null) {
for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) {
mCallbacks.get(_i).onOpen(_db);
}
}
}
@Override
public void onPreMigrate(SupportSQLiteDatabase _db) {
DBUtil.dropFtsSyncTriggers(_db);
}
@Override
public void onPostMigrate(SupportSQLiteDatabase _db) {
}
@Override
protected void validateMigration(SupportSQLiteDatabase _db) {
final HashMap<String, TableInfo.Column> _columnsUsers = new HashMap<String, TableInfo.Column>(2);
_columnsUsers.put("userid", new TableInfo.Column("userid", "TEXT", true, 1));
_columnsUsers.put("username", new TableInfo.Column("username", "TEXT", false, 0));
final HashSet<TableInfo.ForeignKey> _foreignKeysUsers = new HashSet<TableInfo.ForeignKey>(0);
final HashSet<TableInfo.Index> _indicesUsers = new HashSet<TableInfo.Index>(0);
final TableInfo _infoUsers = new TableInfo("users", _columnsUsers, _foreignKeysUsers, _indicesUsers);
final TableInfo _existingUsers = TableInfo.read(_db, "users");
if (! _infoUsers.equals(_existingUsers)) {
throw new IllegalStateException("Migration didn't properly handle users(com.kingty.roomtest.User).\n"
+ " Expected:\n" + _infoUsers + "\n"
+ " Found:\n" + _existingUsers);
}
final HashMap<String, TableInfo.Column> _columnsUsergroups = new HashMap<String, TableInfo.Column>(2);
_columnsUsergroups.put("userGroupId", new TableInfo.Column("userGroupId", "TEXT", true, 1));
_columnsUsergroups.put("groupName", new TableInfo.Column("groupName", "TEXT", false, 0));
final HashSet<TableInfo.ForeignKey> _foreignKeysUsergroups = new HashSet<TableInfo.ForeignKey>(0);
final HashSet<TableInfo.Index> _indicesUsergroups = new HashSet<TableInfo.Index>(0);
final TableInfo _infoUsergroups = new TableInfo("usergroups", _columnsUsergroups, _foreignKeysUsergroups, _indicesUsergroups);
final TableInfo _existingUsergroups = TableInfo.read(_db, "usergroups");
if (! _infoUsergroups.equals(_existingUsergroups)) {
throw new IllegalStateException("Migration didn't properly handle usergroups(com.kingty.roomtest.UserGroup).\n"
+ " Expected:\n" + _infoUsergroups + "\n"
+ " Found:\n" + _existingUsergroups);
}
}
}, "8890a9730e4846f27da03382221fc877", "1fdb937160bfb054175cfe5daf922b3b");
final SupportSQLiteOpenHelper.Configuration _sqliteConfig = SupportSQLiteOpenHelper.Configuration.builder(configuration.context)
.name(configuration.name)
.callback(_openCallback)
.build();
final SupportSQLiteOpenHelper _helper = configuration.sqliteOpenHelperFactory.create(_sqliteConfig);
return _helper;
}
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups");
}
@Override
public void clearAllTables() {
super.assertNotMainThread();
final SupportSQLiteDatabase _db = super.getOpenHelper().getWritableDatabase();
try {
super.beginTransaction();
_db.execSQL("DELETE FROM `users`");
_db.execSQL("DELETE FROM `usergroups`");
super.setTransactionSuccessful();
} finally {
super.endTransaction();
_db.query("PRAGMA wal_checkpoint(FULL)").close();
if (!_db.inTransaction()) {
_db.execSQL("VACUUM");
}
}
}
@Override
public UserDao userDao() {
if (_userDao != null) {
return _userDao;
} else {
synchronized(this) {
if(_userDao == null) {
_userDao = new UserDao_Impl(this);
}
return _userDao;
}
}
}
}
這個(gè)類中邏輯比較清晰。首先是創(chuàng)建了一個(gè)SupportSQLiteOpenHelper來幫你拼了一些必要的SQL語(yǔ)句,比如create table,drop table,open和migrate遷移數(shù)據(jù)等等。后面還有一個(gè)初始化真正的DAO實(shí)現(xiàn)類UserDao_Impl和刪除相關(guān)的表數(shù)據(jù)的方法clearAllTables
這些都是一些比較好理解的。然后我們會(huì)看到一個(gè)我們不好理解的方法createInvalidationTracker (),這個(gè)是用來做什么的?我們先把這個(gè)疑問留下,叫做 重要環(huán)節(jié)三
正式初略閱讀Room的源碼
下面我們提出幾個(gè)問題:
- 在上面生成的代碼中我們看到操作的執(zhí)行都是通過一個(gè)叫
RoomDatabase的_db來做的,那RoomDatabase是什么? - 重要環(huán)節(jié)一,在執(zhí)行前后
beginTransaction和endTransaction做了什么? - 重要環(huán)節(jié)二,創(chuàng)建
Flowable的時(shí)候做了什么,為什么需要table names? - 重要環(huán)節(jié)三,
createInvalidationTracker()是做什么用的? - 最后,串聯(lián)起上面的問題,當(dāng)一個(gè)表發(fā)生更改,監(jiān)聽一個(gè)查詢的實(shí)時(shí)變化是怎么做到的?
帶著這幾個(gè)問題,我們大體的去閱讀一下源碼。閱讀過程中我們有一個(gè)原則,就是先不要特別在意細(xì)節(jié),先捋通大概的邏輯流程。如果你對(duì)細(xì)節(jié)感興趣,再去扣細(xì)節(jié)。
在項(xiàng)目中加以下引用
implementation 'androidx.room:room-runtime:2.1.0-alpha01'
annotationProcessor 'androidx.room:room-compiler:2.1.0-alpha01'
implementation 'androidx.room:room-rxjava2:2.1.0-alpha01'
編譯之后我們可以在External Libraries目錄下看到以下幾個(gè)包:
- androidx.room:room-commom
- androidx.room:room-runtime
- androidx.room:room-rxjava
- androidx.sqlite:sqlite
- androidx.sqlite:sqlite-framework
我先簡(jiǎn)單的介紹下這幾個(gè)包大概是做什么的。
androidx.sqlite:sqlite這個(gè)包主要是重新定義了一層SQLite的Support接口。
androidx.sqlite:sqlite-framework這個(gè)包主要是利用原有的android的Sqlite相關(guān)的API實(shí)現(xiàn)了上面定義的接口。
這兩個(gè)包主要是對(duì)原有的API做了一層代理封裝,我的理解是便于擴(kuò)展。因此我們?cè)诳?code>Room代碼的時(shí)候這部分代碼大概瀏覽一下就OK,不必深究。
androidx.room:room-commom包中定義了一些公共的屬性,和我們用到的所有的注解。
androidx.room:room-runtime是我們需要主要閱讀的邏輯所在的包,Room的核心邏輯都在這個(gè)包中。
androidx.room:room-rxjava當(dāng)我們需要返回一個(gè)Rx包裝過的結(jié)果的時(shí)候,需要這個(gè)包。里面就是一個(gè)重要類RxRoom.java用來把Query包裝成一個(gè)可觀察的對(duì)象。
下面我們帶著上面的問題來看一下代碼。
RoomDatabase是做什么的?
代碼太長(zhǎng)就不全貼,我們看一下它持有的成員變量
protected volatile SupportSQLiteDatabase mDatabase;
private Executor mQueryExecutor;
private SupportSQLiteOpenHelper mOpenHelper;
private final InvalidationTracker mInvalidationTracker;
private boolean mAllowMainThreadQueries;
boolean mWriteAheadLoggingEnabled;
它其實(shí)是對(duì)數(shù)據(jù)庫(kù)的進(jìn)一步封裝,利用真正的SupportSQLiteDatabase和你UsersDatabase_Impl自動(dòng)生成的createOpenHelper()提供的SupportSQLiteOpenHelper來操作數(shù)據(jù)庫(kù)。進(jìn)一步封裝了Transcation并封裝了一些其他的邏輯。
beginTransaction和endTransaction做了什么?
實(shí)際上這也是上一個(gè)問題的一部分,先看看beginTransaction的代碼
/**
* Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}.
*/
public void beginTransaction() {
assertNotMainThread();//禁止主線程執(zhí)行
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();//拿到真正的數(shù)據(jù)庫(kù)對(duì)象
mInvalidationTracker.syncTriggers(database);//??
database.beginTransaction();//開啟事務(wù)
}
從上面的代碼來看其他3句都非常好理解,正常的數(shù)據(jù)庫(kù)事務(wù)流程,但是在開啟事務(wù)之前做了一個(gè)操作mInvalidationTracker.syncTriggers(database);
我們先不忙解釋這個(gè)是什么意思。我們?cè)诳纯?code>endTransaction
/**
* Wrapper for {@link SupportSQLiteDatabase#endTransaction()}.
*/
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();//結(jié)束事務(wù)
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
第一句是正常的結(jié)束事務(wù)的語(yǔ)句,但是結(jié)束之后等待最后一個(gè)事務(wù)結(jié)束,會(huì)做一個(gè)操作mInvalidationTracker.refreshVersionsAsync();
也就是說在開啟事務(wù)之前,結(jié)束事務(wù)之后都調(diào)用了InvalidationTracker做了一些邏輯,再結(jié)合上面的第四個(gè)問題,重要環(huán)節(jié)三createInvalidationTracker()是做什么用的?,一切問題都指向了InvalidationTracker。
InvalidationTracker這個(gè)類是什么作用,我們先從上面的第四個(gè)問題看起。
createInvalidationTracker()是做什么用的?
下面是在自動(dòng)生成的UsersDatabase_Impl.java類中的方法
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups");
}
在RoomDatabase在被初始化的時(shí)候調(diào)用這個(gè)方法賦值給成員變量
/**
* Creates a RoomDatabase.
* <p>
* You cannot create an instance of a database, instead, you should acquire it via
* {@link Room#databaseBuilder(Context, Class, String)} or
* {@link Room#inMemoryDatabaseBuilder(Context, Class)}.
*/
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
因此回答上面的問題就是createInvalidationTracker()給RoomDatabase提供了一個(gè)mInvalidationTracker實(shí)例。
mInvalidationTracker起的作用是什么?
我們來看一下RoomDatabase.java中的調(diào)用流程。
1.初始化
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
2.init中調(diào)用mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name);
@CallSuper
public void init(@NonNull DatabaseConfiguration configuration) {
mOpenHelper = createOpenHelper(configuration);
boolean wal = false;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN) {
wal = configuration.journalMode == JournalMode.WRITE_AHEAD_LOGGING;
mOpenHelper.setWriteAheadLoggingEnabled(wal);
}
mCallbacks = configuration.callbacks;
mQueryExecutor = configuration.queryExecutor;
mAllowMainThreadQueries = configuration.allowMainThreadQueries;
mWriteAheadLoggingEnabled = wal;
if (configuration.multiInstanceInvalidation) {
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
configuration.name);
}
}
3.每次Transaction 開始之前調(diào)用mInvalidationTracker.syncTriggers
/**
* Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}.
*/
public void beginTransaction() {
assertNotMainThread();
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
mInvalidationTracker.syncTriggers(database);
database.beginTransaction();
}
4. 最后一個(gè)Transaction 結(jié)束之后調(diào)用mInvalidationTracker.refreshVersionsAsync
/**
* Wrapper for {@link SupportSQLiteDatabase#endTransaction()}.
*/
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
5,close數(shù)據(jù)庫(kù)的時(shí)候mInvalidationTracker.stopMultiInstanceInvalidation();與第2步對(duì)應(yīng)。
/**
* Closes the database if it is already open.
*/
public void close() {
if (isOpen()) {
try {
mCloseLock.lock();
mInvalidationTracker.stopMultiInstanceInvalidation();
mOpenHelper.close();
} finally {
mCloseLock.unlock();
}
}
}
上面就是InvalidationTracker在RoomDatabase中的整個(gè)生命周期中的調(diào)用情況。從代碼上來看它其實(shí)是在track整個(gè)數(shù)據(jù)的更改情況,因?yàn)樗诿總€(gè)transcation前后做了一些調(diào)用。結(jié)合上面最后的一個(gè)問題當(dāng)一個(gè)表發(fā)生更改,監(jiān)聽一個(gè)查詢的實(shí)時(shí)變化是怎么做到的。大概可以猜測(cè)出來這個(gè)類的主要作用是來保證數(shù)據(jù)發(fā)生更改的時(shí)候,保證可以通知到這個(gè)表上其他的Query。
怎么樣實(shí)現(xiàn)的監(jiān)聽?
我們發(fā)現(xiàn)上面還有一個(gè)問題我們還沒有提到重要環(huán)節(jié)二,創(chuàng)建Flowable的時(shí)候做了什么,為什么需要table names?
我們從這里入手講起。先看下面RxRoom中的代碼
public static Flowable<Object> createFlowable(final RoomDatabase database,
final String... tableNames) {
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
};
if (!emitter.isCancelled()) {
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
database.getInvalidationTracker().removeObserver(observer);
}
}));
}
// emit once to avoid missing any data and also easy chaining
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
}, BackpressureStrategy.LATEST);
}
/**
* Helper method used by generated code to bind a Callable such that it will be run in
* our disk io thread and will automatically block null values since RxJava2 does not like null.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
final String[] tableNames, final Callable<T> callable) {
Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
在上面生成的UserDao_Impl.java類中getUser()這個(gè)方法中調(diào)用的createFlowable這個(gè)方法,也就是上面的第二個(gè)方法,它實(shí)際上調(diào)用的上面的第一個(gè)方法flatmap到這個(gè)本次的查詢。也就是說只要第一個(gè)方法中的Flowable發(fā)射一次數(shù)據(jù),那么這個(gè)查詢就會(huì)執(zhí)行一次,并返回結(jié)果(也就是執(zhí)行這個(gè)callable)。這里應(yīng)該就能看出一點(diǎn)端倪,其實(shí)第一個(gè)方法就是創(chuàng)建出來一個(gè)觀察這個(gè)表變化的觀察者InvalidationTracker.Observer并把它添加到InvalidationTracker的觀察者列表中去,因?yàn)橐粋€(gè)表肯定不止一個(gè)觀察者,所有的Query應(yīng)該都需要觀察表的更改。也就是上面的這行代碼database.getInvalidationTracker().addObserver(observer);到這里RxRoom這個(gè)類的使命就完成了,他就是這樣一個(gè)簡(jiǎn)單的功能,后面你也不需要再關(guān)心它。
InvalidationTracker.Observer是一個(gè)靜態(tài)類,就注意一下其中的一個(gè)方法
/**
* Called when one of the observed tables is invalidated in the database.
*
* @param tables A set of invalidated tables. This is useful when the observer targets
* multiple tables and you want to know which table is invalidated. This will
* be names of underlying tables when you are observing views.
public abstract void onInvalidated(@NonNull Set<String> tables);
從備注上已經(jīng)寫的很清楚了,就是表發(fā)生更改狀態(tài)的時(shí)候會(huì)調(diào)用這個(gè)方法,emitter就會(huì)發(fā)射數(shù)據(jù),通知Query去requery.
我們著重看一下addObserver這個(gè)方法干了什么?
@WorkerThread
public void addObserver(@NonNull Observer observer) {
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
long[] versions = new long[tableNames.length];
// TODO sync versions ?
for (int i = 0; i < size; i++) {
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
versions[i] = mMaxVersion;
}
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames, versions);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
syncTriggers();
}
}
首先對(duì)Observer做了一層包裝,主要就是包裝了當(dāng)表發(fā)生變化的時(shí)候通過各種方式去通知也就是執(zhí)行mObserver.onInvalidated(invalidatedTables);,接下來,把包裝后的wrapper放進(jìn)map里。然后在滿足特定條件下會(huì)執(zhí)行syncTriggers();這個(gè)似曾相識(shí),在上面RoomDatabase開始一個(gè)事務(wù)之前也執(zhí)行這個(gè)方法。我們來仔細(xì)看看這個(gè)方法做了什么。
void syncTriggers(SupportSQLiteDatabase database) {
if (database.inTransaction()) {
// we won't run this inside another transaction.
return;
}
try {
// This method runs in a while loop because while changes are synced to db, another
// runnable may be skipped. If we cause it to skip, we need to do its work.
while (true) {
Lock closeLock = mDatabase.getCloseLock();
closeLock.lock();
try {
// there is a potential race condition where another mSyncTriggers runnable
// can start running right after we get the tables list to sync.
final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
if (tablesToSync == null) {
return;
}
final int limit = tablesToSync.length;
try {
database.beginTransaction();
for (int tableId = 0; tableId < limit; tableId++) {
switch (tablesToSync[tableId]) {
case ObservedTableTracker.ADD:
startTrackingTable(database, tableId);
break;
case ObservedTableTracker.REMOVE:
stopTrackingTable(database, tableId);
break;
}
}
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
mObservedTableTracker.onSyncCompleted();
} finally {
closeLock.unlock();
}
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
}
}
這個(gè)方法看起來很長(zhǎng),其實(shí)是在做一件事.ObservedTableTracker維護(hù)了一個(gè)需要被觀察的表的列表,就是發(fā)現(xiàn)有新的表需要被觀察就執(zhí)行startTrackingTable(database, tableId);,有表不需要被觀察了就執(zhí)行stopTrackingTable(database, tableId);。
繼續(xù)往下看,看看這兩個(gè)方法做了什么?
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]);
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]);
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN INSERT OR REPLACE INTO ")
.append(UPDATE_TABLE_NAME)
.append(" VALUES(null, ")
.append(tableId)
.append("); END");
writableDb.execSQL(stringBuilder.toString());
}
}
插曲: InvalidationTracker自己維護(hù)了一個(gè)叫room_table_modification_log的表,有兩個(gè)字段,一個(gè)是version它是自增的,還有一個(gè)是table_id,是被觀察的表的標(biāo)識(shí)。
其實(shí)就是當(dāng)需要去觀察一個(gè)表的時(shí)候startTrackingTable ()就在數(shù)據(jù)庫(kù)上創(chuàng)建了三個(gè)數(shù)據(jù)庫(kù)的 Trigger 。關(guān)于Trigger是什么這是數(shù)據(jù)庫(kù)基礎(chǔ)知識(shí),請(qǐng)自備。也就是說,只要在這個(gè)表上發(fā)生了插入修改或者刪除,就會(huì)往room_table_modification_log表里面插入一條數(shù)據(jù)INSERT OR REPLACE INTO room_table_modification_log VALUES(null, table_id)。
當(dāng)不需要觀察一個(gè)表的時(shí)候,就通過stopTrackingTable把這三個(gè)Trigger刪除掉。
以上就是我們?cè)趧?chuàng)建一個(gè)Query做的事情。
我們先對(duì)創(chuàng)建一個(gè)Query的流程做一個(gè)小的總結(jié):
- 通過自動(dòng)生成的代碼創(chuàng)建一個(gè)Flowable
- RxRoom會(huì)根據(jù)這個(gè)Flowable創(chuàng)建一個(gè)InvalidationTracker.Observer
- InvalidationTracker把這個(gè)Observer加到自己的觀察列表中
- 如果之前沒有人觀察過這個(gè)表,會(huì)去創(chuàng)建這個(gè)表上修改的Trigger
到這里,我們似乎應(yīng)該有一點(diǎn)頭緒了,既然每次有數(shù)據(jù)更新的時(shí)候就會(huì)往這個(gè)表中插入一條數(shù)據(jù),那在每一個(gè)Trascation結(jié)束之后去查這個(gè)表就應(yīng)該可以知道哪些表上的Query可以更新。所以我們回到上面的RoomDatabase中看看endTrasction之后的mInvalidationTracker.refreshVersionsAsync();到底做了什么?
/**
* Enqueues a task to refresh the list of updated tables.
* <p>
* This method is automatically called when {@link RoomDatabase#endTransaction()} is called but
* if you have another connection to the database or directly use {@link
* SupportSQLiteDatabase}, you may need to call this manually.
*/
@SuppressWarnings("WeakerAccess")
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
boolean hasUpdatedTable = false;
try {
closeLock.lock();
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
mCleanupStatement.executeUpdateDelete();
mQueryArgs[0] = mMaxVersion;
if (mDatabase.mWriteAheadLoggingEnabled) {
// This transaction has to be on the underlying DB rather than the RoomDatabase
// in order to avoid a recursive loop after endTransaction.
SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
try {
db.beginTransaction();
hasUpdatedTable = checkUpdatedTable();
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} else {
hasUpdatedTable = checkUpdatedTable();
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
}
if (hasUpdatedTable) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableVersions(mTableVersions);
}
}
}
}
private boolean checkUpdatedTable() {
boolean hasUpdatedTable = false;
Cursor cursor = mDatabase.query(SELECT_UPDATED_TABLES_SQL, mQueryArgs);
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveToNext()) {
final long version = cursor.getLong(0);
final int tableId = cursor.getInt(1);
mTableVersions[tableId] = version;
hasUpdatedTable = true;
// result is ordered so we can safely do this assignment
mMaxVersion = version;
}
} finally {
cursor.close();
}
return hasUpdatedTable;
}
}
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
+ " WHERE " + VERSION_COLUMN_NAME
+ " > ? ORDER BY " + VERSION_COLUMN_NAME + " ASC;";
它實(shí)際上是執(zhí)行了mRefreshRunnable的,這個(gè)runnerable的邏輯非常清晰,先做一些邊界檢測(cè),然后去checkUpdatedTable,看有沒有用表在變化,怎么檢測(cè)??瓷厦娴膕ql語(yǔ)句,就是去查room_table_modification_log中相同的table_id的version,如果有大于之前保存的maxversion的數(shù)據(jù),說明有新的修改。然后調(diào)用ObserverWrapper 中的notifyByTableVersions去通知表上的觀察者。
這也就回到了上面最后一個(gè)問題當(dāng)一個(gè)表發(fā)生更改,監(jiān)聽一個(gè)查詢的實(shí)時(shí)變化是怎么做到的?。
MultiInstanceInvalidation
到這里我們還漏了一點(diǎn)沒有講到。那就是剛才說InvalidationTracker在RoomDatabase中的整個(gè)生命周期中的調(diào)用情況的時(shí)候還有初始化的時(shí)候和關(guān)閉數(shù)據(jù)庫(kù)的時(shí)候執(zhí)行了
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name);
和
mInvalidationTracker.stopMultiInstanceInvalidation();
因?yàn)槲覀冊(cè)谝弥胁豢赡苡肋h(yuǎn)是單標(biāo)上的查詢。也就是說我們一個(gè)查詢可能是連表的查詢,那么這個(gè)查詢的更新就會(huì)依賴于多個(gè)表的觀察操作。這就引出了框架中的一個(gè)經(jīng)典的CS結(jié)構(gòu)的兩個(gè)類MultiInstanceInvalidationClient, MultiInstanceInvalidationService
在初始化RoomDatabase的時(shí)候我們會(huì)開啟一個(gè)Client也就是startMultiInstanceInvalidation,其實(shí)就是創(chuàng)建了有一個(gè)Client
void startMultiInstanceInvalidation(Context context, String name) {
mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
mDatabase.getQueryExecutor());
}
看一下Client初始化的過程
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
從上面來看,其實(shí)在創(chuàng)建RoomDatabase的時(shí)候創(chuàng)建Client的時(shí)候,我們就也創(chuàng)建了一個(gè)InvalidationTracker.Observer,并且添加進(jìn)InvalidationTracker的觀察列表,當(dāng)這個(gè)表發(fā)生更新的時(shí)候會(huì)通過服務(wù)端Service broadcastInvalidation方法去通知客戶端Client。
@SuppressWarnings("WeakerAccess")
final Runnable mSetUpRunnable = new Runnable() {
@Override
public void run() {
try {
final IMultiInstanceInvalidationService service = mService;
if (service != null) {
mClientId = service.registerCallback(mCallback, mName);
mInvalidationTracker.addObserver(mObserver);
}
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
}
}
};
而每個(gè)Client在Setup的時(shí)候會(huì)去service.registerCallback
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
這個(gè)callback就是是說收到broadcastInvalidation的信息的時(shí)候會(huì)去執(zhí)行。
這個(gè)流程就是在多個(gè)RoomDatabase之間是如何溝通的,也就是說在其他的RoomDatabase也修改了你這個(gè)表,那是如何通知到你發(fā)生改變的。
小結(jié)
到這里我們?cè)谡w上把這個(gè)Room是如何做到響應(yīng)式的做了一個(gè)框架的解析?;旧弦惨呀?jīng)瀏覽了整個(gè)Room的核心代碼。當(dāng)然其中還有很多的細(xì)節(jié),如果感興趣可以自己去好好讀一下。因?yàn)榭赡芪乙膊惶宄N乙彩浅趼缘淖x了一下做了一些自己的分析??隙ㄓ欣斫獠粚?duì)的地方。大家閱讀過程中請(qǐng)辯證看待,多多指正。