
Python 和 SQL:它們?cè)谝黄鹱叩酶h(yuǎn)
盡管有很多關(guān)于數(shù)據(jù)庫(kù)查詢性能優(yōu)化的內(nèi)容,但我發(fā)現(xiàn),有時(shí),提高 SQL 能力的最佳方法是將其與腳本語(yǔ)言(如 Python)結(jié)合使用。眾所周知,雖然 SQL 是一種用于讀取、操作和寫(xiě)入數(shù)據(jù)庫(kù)的強(qiáng)大方法,但它缺乏腳本語(yǔ)言的靈活性和實(shí)用性,這使得某些操作(如循環(huán))幾乎不可能進(jìn)行。
此外,底層數(shù)據(jù)庫(kù)的限制會(huì)降低性能,也有可能阻止執(zhí)行消耗資源的查詢。 例如,我曾經(jīng)遇到過(guò)一個(gè)持續(xù)存在的過(guò)度的元數(shù)據(jù)讀取錯(cuò)誤,我將在下面詳細(xì)說(shuō)明。像 Python 這樣的腳本語(yǔ)言可以為上述缺陷提供了一種解決方法,他能做的不僅僅是取代SQL 工作。 結(jié)合使用 Python 和 SQL 可以生成更強(qiáng)大、高效和清晰的腳本。
1. 遍歷多個(gè) SQL 表
Python 可以幫助提升 SQL 的最直白的方法之一是將查詢字符串合并到 Python 的循環(huán)結(jié)構(gòu)中以連續(xù)迭代多個(gè)查詢。使用在 Python 中定義的變量,你可以創(chuàng)建一個(gè)基本查詢并使用 SQL 文本和 Python 變量進(jìn)行操作。
例如,假設(shè)我們正在嘗試獲取一個(gè)列表,它按大小列出GCP 項(xiàng)目中包含的所有數(shù)據(jù)集和表。
在純 SQL 中,你的查詢語(yǔ)句應(yīng)該是:
SELECT * FROM `my_project.dataset_1.INFORMATION_SCHEMA`
UNION ALL
SELECT * FROM `my_project.dataset_2.INFORMATION_SCHEMA`
UNION ALL
SELECT * FROM `my_project.dataset_3.INFORMATION_SCHEMA`
使用Python實(shí)現(xiàn)的話,我們就不需要手動(dòng)地查詢一個(gè)又一個(gè)的數(shù)據(jù)集:
from google.cloud import bigquery
datasets = ['dataset_1', 'dataset_2', 'dataset_3']
bq_client = bigquery.Client()
for dataset in datasets:
get_datasets = bq_client.query("SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size
FROM `"+dataset.dataset_id+"`.__TABLES__ GROUP BY 1, 2, 3")
tables = get_datasets.result()
for table in tables:
dataset_id = table.dataset
table_id = table.table_id
size = table.size_bytes
gb_size = table.gb_size
print(dataset_id, table_id, size, gb_size)
盡管有兩個(gè)循環(huán),這可能看起來(lái)有些復(fù)雜,但我們所做的只是循環(huán)遍歷數(shù)據(jù)集列表。在程序里我們要更改的只是我們引用的數(shù)據(jù)集,本質(zhì)上,創(chuàng)建的是與 UNION 相同數(shù)目的查詢,但重復(fù)性手動(dòng)工作少很多。
""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size
FROM `my_project.dataset_1.`__TABLES__ GROUP BY 1, 2, 3 """
""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size
FROM `my_project.dataset_2.`__TABLES__ GROUP BY 1, 2, 3 """
""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size
FROM `my_project.dataset_3.`__TABLES__ GROUP BY 1, 2, 3 """
如果你的實(shí)際工作中需要查詢這么多的數(shù)據(jù),可以使用Python的 bq_client.list_datasets() 函數(shù)生成數(shù)據(jù)集列表。需要注意的是,在 SQL 中是可以進(jìn)行循環(huán)的,但通常必須采取額外的步驟,例如定義變量和創(chuàng)建 UDF 來(lái)完成所需的操作??梢詤⒖迹?br>
SQL WHILE loop with simple examples
www.sqlshack.com/sql-while-loop-with-simple-examples/
2. 自動(dòng)化模式定義
BigQuery Python Client允許開(kāi)發(fā)人員將模式(Schema)定義為列表,這個(gè)列表稍后可以導(dǎo)入加載函數(shù)(Loading Functions)。在腳本中定義 BigQuery 結(jié)構(gòu)而不是默認(rèn)靜態(tài)創(chuàng)建,這樣的做法會(huì)更可靠點(diǎn),以免類型出錯(cuò)。
如果您手動(dòng)創(chuàng)建了一個(gè)BigQuery 結(jié)構(gòu),它可能如下所示:
schema = [
bigquery.SchemaField("first_name", "STRING"),
bigquery.SchemaField("last_name", "STRING"),
bigquery.SchemaField("age", "INTEGER")
]
這樣創(chuàng)建沒(méi)有什么大問(wèn)題,但是,當(dāng)您處理需要 100 列或更多列的數(shù)據(jù)時(shí),這就...很沒(méi)勁了。我的解決方案是一個(gè)相對(duì)簡(jiǎn)單的 Python 函數(shù),它以類似于我之前描述的循環(huán)的方式自動(dòng)填充這些字段。
def create_schema(field_list: list, type_list: list):
schema_list = []
for fields, types in zip(field_list, type_list):
schema = bigquery.SchemaField(fields, types)
schema_list.append(schema)
return schema_list
該函數(shù)的輸出將與上面定義的模式完全相同,但如果我不指定字段是否為 NULLABLE,它將默認(rèn)為 NULLABLE。
3. 在 1 行 Python 中轉(zhuǎn)換為Data Frame
有一種優(yōu)雅而簡(jiǎn)單的方法可以創(chuàng)建從 SQL 查詢派生的DataFrame。它只需要一行 Python,尤其是當(dāng)你查詢存儲(chǔ)在配置文件中的數(shù)據(jù)時(shí)。
query = """ SELECT * FROM `my_project.dataset.table` """
query_job = bq_client.query(cfg.query).to_dataframe()
你甚至可以在同一行代碼里直接到數(shù)據(jù)結(jié)果存儲(chǔ)到一個(gè)CSV文件中。
query_job = bq_client.query(cfg.query).to_dataframe().to_csv('query_output.csv')
至少對(duì)我來(lái)說(shuō),這是一個(gè)比必須從 SQL 引擎的 UI 導(dǎo)出或下載報(bào)告更簡(jiǎn)化的過(guò)程。
4. 解決 SQL 環(huán)境限制
在我曾經(jīng)的一個(gè)自動(dòng)化審計(jì)項(xiàng)目中,需要?jiǎng)h除我們數(shù)據(jù)倉(cāng)庫(kù)中未使用的表??梢韵胂?,此過(guò)程涉及大量元數(shù)據(jù)。您可能不知道,BigQuery 對(duì)每次實(shí)例中允許的元讀取數(shù)量施加了限制。
我不斷遇到warning和error,告訴我無(wú)法運(yùn)行查詢,因?yàn)樗鼑L試了太多元讀取。起初我試圖將我的工作分成兩個(gè)單獨(dú)的 CTE,但因?yàn)樗鼈冊(cè)谕粋€(gè)查詢中運(yùn)行,所以我仍然會(huì)遇到同樣的錯(cuò)誤。然后一位資深工程師建議我應(yīng)該在 Python 中以Chunk的形式運(yùn)行這個(gè)東西,并使用 Pandas 進(jìn)行連接。
這種方法非常有效,以至于我最終將整個(gè)腳本轉(zhuǎn)換為 Pandas,只將查詢保留為原始數(shù)據(jù)源。如果您運(yùn)行的查詢過(guò)于消耗資源,請(qǐng)考慮將其拆分為多個(gè)部分,在 Python 中運(yùn)行并使用 Pandas 重新加入。因?yàn)槲乙玫?CTE 有數(shù)百行長(zhǎng)并且涉及 50 多個(gè)元讀取,所以下面只是其很小的代碼片段:
query_1 = """ SELECT * FROM a_resource_consuming_cte_1 """
query_2 = """ SELECT * FROM a_resource_consuming_cte_2 """
query_1_df = bq_client.query(query_1).to_dataframe()
query_2_df = bq_client.query(query_2).to_dataframe()
final_df = pd.concat([query_1_df, query_2_df]
5. 添加/截?cái)?(Append/Truncate)
對(duì) BigQuery 的一個(gè)主要抱怨是它不支持 APPEND/TRUNCATE 操作。我指我們可以將一條記錄添加到表中或覆蓋它們。目前,BigQuery 確實(shí)包含一個(gè)允許開(kāi)發(fā)人員指定這兩個(gè)操作的參數(shù)。因此,如果你只想覆蓋一段特定時(shí)間范圍內(nèi)的 SQL 表,就需要有點(diǎn)創(chuàng)意了。
值得慶幸的是,結(jié)合 Python 和 SQL 將使我們能夠進(jìn)行簡(jiǎn)單進(jìn)行這兩個(gè)操作。
在開(kāi)始編寫(xiě)代碼之前,讓我們先談?wù)劄槭裁慈粘P枰采w行。假設(shè)有一個(gè)每天更新幾次的電子表格,并且在每天結(jié)束時(shí),我們想要上傳今天日期的條目結(jié)果。由于工作表一天可以編輯多次,因此簡(jiǎn)單地附加數(shù)據(jù)會(huì)產(chǎn)生重復(fù)的行,從而導(dǎo)致數(shù)據(jù)混亂。所以我們會(huì)希望在加載數(shù)據(jù)時(shí)消除任何重復(fù)項(xiàng)。
最簡(jiǎn)單的方法是將 CRUD 語(yǔ)句與 Python/Pandas 代碼配對(duì),這將創(chuàng)建我們要覆蓋的數(shù)據(jù)子集。
crud_statement = """ DELETE FROM table WHERE date = CURRENT_DATE() """
bq_client.query(crud_statement)
df = df[(df['date'] == date.today())]
bq_client.load_table_from_dataframe(df, job_config)
這將使我們的數(shù)據(jù)絕對(duì)相等并能夠反映每次運(yùn)行腳本時(shí)的實(shí)時(shí)變化。
總結(jié)
將 Python 等腳本語(yǔ)言與 SQL 相結(jié)合,可以為僅使用 SQL 無(wú)法實(shí)現(xiàn)的操作創(chuàng)造更多可能性。由于許多數(shù)據(jù)作業(yè)不僅需要 SQL 知識(shí),而且至少還需要 Python 等腳本語(yǔ)言的知識(shí),因此掌握 SQL 和 Python 結(jié)合的強(qiáng)大功能是很必要的。