0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

深度剖析SQL中的Grouping Sets語句2

jf_78858299 ? 來源:元閏子的邀請 ? 作者:元閏子 ? 2023-05-10 17:44 ? 次閱讀

Expand 算子的實現(xiàn)

Expand 算子在 Spark SQL 源碼中的實現(xiàn)為 ExpandExec 類(Spark SQL 中的算子實現(xiàn)類的命名都是 XxxExec 的格式,其中 Xxx 為具體的算子名,比如 Project 算子的實現(xiàn)類為 ProjectExec),核心代碼如下:

/**
 * Apply all of the GroupExpressions to every input row, hence we will get
 * multiple output rows for an input row.
 * @param projections The group of expressions, all of the group expressions should
 *                    output the same schema specified bye the parameter `output`
 * @param output      The output Schema
 * @param child       Child operator
 */
case class ExpandExec(
    projections: Seq[Seq[Expression]],
    output: Seq[Attribute],
    child: SparkPlan)
  extends UnaryExecNode with CodegenSupport {

  ...
  // 關(guān)鍵點1,將child.output,也即上游算子輸出數(shù)據(jù)的schema,
  // 綁定到表達式數(shù)組exprs,以此來計算輸出數(shù)據(jù)
  private[this] val projection =
    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)

  // doExecute()方法為Expand算子執(zhí)行邏輯所在
  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

    // 處理上游算子的輸出數(shù)據(jù),Expand算子的輸入數(shù)據(jù)就從iter迭代器獲取
    child.execute().mapPartitions { iter =>
      // 關(guān)鍵點2,projections對應(yīng)了Grouping Sets里面每個grouping set的表達式,
      // 表達式輸出數(shù)據(jù)的schema為this.output, 比如 (quantity, city, car_model, spark_grouping_id)
      // 這里的邏輯是為它們各自生成一個UnsafeProjection對象,通過該對象的apply方法就能得出Expand算子的輸出數(shù)據(jù)
      val groups = projections.map(projection).toArray
      new Iterator[InternalRow] {
        private[this] var result: InternalRow = _
        private[this] var idx = -1  // -1 means the initial state
        private[this] var input: InternalRow = _

        override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext

        override final def next(): InternalRow = {
          // 關(guān)鍵點3,對于輸入數(shù)據(jù)的每一條記錄,都重復(fù)使用N次,其中N的大小對應(yīng)了projections數(shù)組的大小,
          // 也即Grouping Sets里指定的grouping set的數(shù)量
          if (idx <= 0) {
            // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
            input = iter.next()
            idx = 0
          }
          // 關(guān)鍵點4,對輸入數(shù)據(jù)的每一條記錄,通過UnsafeProjection計算得出輸出數(shù)據(jù),
          // 每個grouping set對應(yīng)的UnsafeProjection都會對同一個input計算一遍
          result = groups(idx)(input)
          idx += 1

          if (idx == groups.length && iter.hasNext) {
            idx = 0
          }

          numOutputRows += 1
          result
        }
      }
    }
  }
  ...
}

ExpandExec 的實現(xiàn)并不復(fù)雜,想要理解它的運作原理,關(guān)鍵是看懂上述源碼中提到的 4 個關(guān)鍵點。

關(guān)鍵點 1關(guān)鍵點 2 是基礎(chǔ),關(guān)鍵點 2 中的 groups 是一個 UnsafeProjection[N] 數(shù)組類型,其中每個 UnsafeProjection 代表了 Grouping Sets 語句里指定的 grouping set,它的定義是這樣的:

// A projection that returns UnsafeRow.
abstract class UnsafeProjection extends Projection {
  override def apply(row: InternalRow): UnsafeRow
}

// The factory object for `UnsafeProjection`.
object UnsafeProjection
    extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
  // Returns an UnsafeProjection for given sequence of Expressions, which will be bound to
  // `inputSchema`.
  def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): UnsafeProjection = {
    create(bindReferences(exprs, inputSchema))
  }
  ...
}

UnsafeProjection 起來了類似列投影的作用,其中, apply 方法根據(jù)創(chuàng)建時的傳參 exprsinputSchema,對輸入記錄進行列投影,得出輸出記錄。

比如,前面的 GROUPING SETS ((city, car_model), (city), (car_model), ())例子,它對應(yīng)的 groups 是這樣的:

圖片

其中,AttributeReference 類型的表達式,在計算時,會直接引用輸入數(shù)據(jù)對應(yīng)列的值;Iteral 類型的表達式,在計算時,值是固定的。

關(guān)鍵點 3關(guān)鍵點 4 是 Expand 算子的精華所在,ExpandExec 通過這兩段邏輯,將每一個輸入記錄, 擴展(Expand) 成 N 條輸出記錄。

關(guān)鍵點 4groups(idx)(input) 等同于 groups(idx).apply(input)

還是以前面 GROUPING SETS ((city, car_model), (city), (car_model), ()) 為例子,效果是這樣的:

圖片

到這里,我們已經(jīng)弄清楚 Expand 算子的工作原理,再回頭看前面提到的 3 個問題,也不難回答了:

  1. Expand 的實現(xiàn)邏輯是怎樣的,為什么能達到 Union All 的效果?
    如果說 Union All 是先聚合再聯(lián)合,那么 Expand 就是先聯(lián)合再聚合。Expand 利用 groups 里的 N 個表達式對每條輸入記錄進行計算,擴展成 N 條輸出記錄。后面再聚合時,就能達到與 Union All 一樣的效果了。
  2. Expand 節(jié)點的輸出數(shù)據(jù)是怎樣的

在 schema 上,Expand 輸出數(shù)據(jù)會比輸入數(shù)據(jù)多出 spark_grouping_id 列;在記錄數(shù)上,是輸入數(shù)據(jù)記錄數(shù)的 N 倍。

  1. spark_grouping_id 列的作用是什么 ?

spark_grouping_id 給每個 grouping set 進行編號,這樣,即使在 Expand 階段把數(shù)據(jù)先聯(lián)合起來,在 Aggregate 階段(把 spark_grouping_id 加入到分組規(guī)則)也能保證數(shù)據(jù)能夠按照每個 grouping set 分別聚合,確保了結(jié)果的正確性。

查詢性能對比

從前文可知,Grouping Sets 和 Union All 兩個版本的 SQL 語句有著一樣的效果,但是它們的執(zhí)行計劃卻有著巨大的差別。下面,我們將比對兩個版本之間的執(zhí)行性能差異。

spark-sql 執(zhí)行完 SQL 語句之后會打印耗時信息,我們對兩個版本的 SQL 分別執(zhí)行 10 次,得到如下信息:

// Grouping Sets 版本執(zhí)行10次的耗時信息
// SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY GROUPING SETS ((city, car_model), (city), (car_model), ()) ORDER BY city, car_model;
Time taken: 0.289 seconds, Fetched 15 row(s)
Time taken: 0.251 seconds, Fetched 15 row(s)
Time taken: 0.259 seconds, Fetched 15 row(s)
Time taken: 0.258 seconds, Fetched 15 row(s)
Time taken: 0.296 seconds, Fetched 15 row(s)
Time taken: 0.247 seconds, Fetched 15 row(s)
Time taken: 0.298 seconds, Fetched 15 row(s)
Time taken: 0.286 seconds, Fetched 15 row(s)
Time taken: 0.292 seconds, Fetched 15 row(s)
Time taken: 0.282 seconds, Fetched 15 row(s)

// Union All 版本執(zhí)行10次的耗時信息
// (SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY city, car_model) UNION ALL (SELECT city, NULL as car_model, sum(quantity) AS sum FROM dealer GROUP BY city) UNION ALL (SELECT NULL as city, car_model, sum(quantity) AS sum FROM dealer GROUP BY car_model) UNION ALL (SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer) ORDER BY city, car_model;
Time taken: 0.628 seconds, Fetched 15 row(s)
Time taken: 0.594 seconds, Fetched 15 row(s)
Time taken: 0.591 seconds, Fetched 15 row(s)
Time taken: 0.607 seconds, Fetched 15 row(s)
Time taken: 0.616 seconds, Fetched 15 row(s)
Time taken: 0.64 seconds, Fetched 15 row(s)
Time taken: 0.623 seconds, Fetched 15 row(s)
Time taken: 0.625 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)

可以算出,Grouping Sets 版本的 SQL 平均耗時為 0.276s ;Union All 版本的 SQL 平均耗時為 0.616s ,是前者的 2.2 倍 !

所以, Grouping Sets 版本的 SQL 不僅在表達上更加簡潔,在性能上也更加高效

RollUp 和 Cube

Group By 的高級用法中,還有 RollUpCube 兩個比較常用。

首先,我們看下 RollUp 語句

Spark SQL 官方文檔中 SQL Syntax 一節(jié)對 RollUp 語句的描述如下:

Specifies multiple levels of aggregations in a single statement. This clause is used to compute aggregations based on multiple grouping sets. ROLLUP is a shorthand for GROUPING SETS. (... 一些例子)

官方文檔中,把 RollUp 描述為 Grouping Sets 的簡寫,等價規(guī)則為:RollUp(A, B, C) == Grouping Sets((A, B, C), (A, B), (A), ())。

比如,Group By RollUp(city, car_model) 就等同于 Group By Grouping Sets((city, car_model), (city), ())。

下面,我們通過 expand extended 看下 RollUp 版本 SQL 的 Optimized Logical Plan:

spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY ROLLUP(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2164 ASC NULLS FIRST, car_model#2165 ASC NULLS FIRST], true
+- Aggregate [city#2164, car_model#2165, spark_grouping_id#2163L], [city#2164, car_model#2165, sum(quantity#2159) AS sum#2150L]
   +- Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]], [quantity#2159, city#2164, car_model#2165, spark_grouping_id#2163L]
      +- Project [quantity#2159, city#2157, car_model#2158]
         +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2156, city#2157, car_model#2158, quantity#2159], Partition Cols: []]
== Physical Plan ==
...

從上述 Plan 可以看出,RollUp 底層實現(xiàn)用的也是 Expand 算子,說明 RollUp 確實是基于 Grouping Sets 實現(xiàn)的。 而且 Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]] 也表明 RollUp 符合等價規(guī)則。

下面,我們按照同樣的思路,看下 Cube 語句 。

Spark SQL 官方文檔中 SQL Syntax 一節(jié)對 Cube 語句的描述如下:

CUBE clause is used to perform aggregations based on combination of grouping columns specified in the GROUP BYclause. CUBE is a shorthand for GROUPING SETS. (... 一些例子)

同樣,官方文檔把 Cube 描述為 Grouping Sets 的簡寫,等價規(guī)則為:Cube(A, B, C) == Grouping Sets((A, B, C), (A, B), (A, C), (B, C), (A), (B), (C), ())

比如,Group By Cube(city, car_model) 就等同于 Group By Grouping Sets((city, car_model), (city), (car_model), ())

下面,我們通過 expand extended 看下 Cube 版本 SQL 的 Optimized Logical Plan:

spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY CUBE(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2202 ASC NULLS FIRST, car_model#2203 ASC NULLS FIRST], true
+- Aggregate [city#2202, car_model#2203, spark_grouping_id#2201L], [city#2202, car_model#2203, sum(quantity#2197) AS sum#2188L]
   +- Expand [[quantity#2197, city#2195, car_model#2196, 0], [quantity#2197, city#2195, null, 1], [quantity#2197, null, car_model#2196, 2], [quantity#2197, null, null, 3]], [quantity#2197, city#2202, car_model#2203, spark_grouping_id#2201L]
      +- Project [quantity#2197, city#2195, car_model#2196]
         +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2194, city#2195, car_model#2196, quantity#2197], Partition Cols: []]
== Physical Plan ==
...

從上述 Plan 可以看出,Cube 底層用的也是 Expand 算子,說明 Cube 確實基于 Grouping Sets 實現(xiàn),而且也符合等價規(guī)則。

所以,RollUpCube 可以看成是 Grouping Sets 的語法糖,在底層實現(xiàn)和性能上是一樣的。

最后

本文重點討論了 Group By 高級用法 Groupings Sets 語句的功能和底層實現(xiàn)。

雖然 Groupings Sets 的功能,通過 Union All 也能實現(xiàn),但前者并非后者的語法糖,它們的底層實現(xiàn)完全不一樣。Grouping Sets 采用的是先聯(lián)合再聚合的思路,通過 spark_grouping_id 列來保證數(shù)據(jù)的正確性;Union All 則采用先聚合再聯(lián)合的思路。Grouping Sets 在 SQL 語句表達和性能上都有更大的優(yōu)勢

Group By 的另外兩個高級用法 RollUpCube 則可以看成是 Grouping Sets 的語法糖,它們的底層都是基于 Expand 算子實現(xiàn), 在性能上與直接使用 Grouping Sets 是一樣的,但在 SQL 表達上更加簡潔

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    6715

    瀏覽量

    88316
  • SQL
    SQL
    +關(guān)注

    關(guān)注

    1

    文章

    750

    瀏覽量

    43900
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4237

    瀏覽量

    61973
收藏 人收藏

    評論

    相關(guān)推薦

    在Delphi動態(tài)地使用SQL查詢語句

    在Delphi動態(tài)地使用SQL查詢語句在一般的數(shù)據(jù)庫管理系統(tǒng),通常都需要應(yīng)用SQL查詢語句
    發(fā)表于 05-10 11:10

    SQL語句的兩種嵌套方式

    一般情況下,SQL語句是嵌套在宿主語言(如C語言)的。有兩種嵌套方式:1.調(diào)用層接口(CLI):提供一些庫,庫的函數(shù)和方法實現(xiàn)SQL的調(diào)
    發(fā)表于 05-23 08:51

    區(qū)分SQL語句與主語言語句

    為了區(qū)分SQL語句與主語言語句,所有SQL 語句必須加前綴EXEC SQL處理過程:含嵌入式
    發(fā)表于 10-28 08:44

    為什么要動態(tài)sql語句?

    為什么要動態(tài)sql語句?因為動態(tài)sql語句能夠提供一些比較友好的機制1、可以使得一些在編譯過程無法獲得完整的
    發(fā)表于 12-20 06:00

    數(shù)據(jù)庫SQL語句電子教程

    電子發(fā)燒友為您提供了數(shù)據(jù)庫SQL語句電子教程,幫助您了解數(shù)據(jù)庫 SQL語句 ,學(xué)習(xí)讀懂?dāng)?shù)據(jù)庫SQL語句
    發(fā)表于 07-14 17:09 ?0次下載

    sql語句實例講解

    SQL是用來存取關(guān)系數(shù)據(jù)庫的語言,具有查詢、操縱、定義和控制關(guān)系型數(shù)據(jù)庫的四方面功能。常見的關(guān)系數(shù)據(jù)庫有Oracle,SQLServer,DB2,Sybase。開源不收費的有MYSQL,SQLLite等。今天我們主要以MYSQL為例子,講解
    發(fā)表于 11-17 12:39 ?8997次閱讀
    <b class='flag-5'>sql</b><b class='flag-5'>語句</b>實例講解

    如何使用navicat或PHPMySQLAdmin導(dǎo)入SQL語句

    很多朋友問我們怎么導(dǎo)入SQL語句,這是新人最需要知道的東西,現(xiàn)制作圖文教程,希望對新手有所幫助,順便文末附SQL語句導(dǎo)入導(dǎo)出大全,高手可以提供更加詳細(xì)的教程。
    發(fā)表于 04-10 15:06 ?2次下載

    VSSQL命令語句的詳細(xì)資料免費下載

    本文檔的主要內(nèi)容詳細(xì)介紹的是微軟VS(Microsoft Visual Studio)SQL命令語句的詳細(xì)資料免費下載
    發(fā)表于 10-09 11:45 ?8次下載

    如何使用SQL修復(fù)語句程序說明

    本文檔的主要內(nèi)容詳細(xì)介紹的是如何使用SQL修復(fù)語句程序說明。
    發(fā)表于 10-31 15:09 ?5次下載

    嵌入式SQL語句

    為了區(qū)分SQL語句與主語言語句,所有SQL 語句必須加前綴EXEC SQL處理過程:含嵌入式
    發(fā)表于 10-21 11:51 ?4次下載
    嵌入式<b class='flag-5'>SQL</b><b class='flag-5'>語句</b>

    Group By高級用法Groupings Sets語句的功能和底層實現(xiàn)

    SQL Group By 語句大家都很熟悉,根據(jù)指定的規(guī)則對數(shù)據(jù)進行分組,常常和聚合函數(shù)一起使用。
    的頭像 發(fā)表于 07-04 10:26 ?2982次閱讀

    Java如何解析、格式化、生成SQL語句

    昨天在群里看到有小伙伴問,Java里如何解析SQL語句然后格式化SQL,是否有現(xiàn)成類庫可以使用?
    的頭像 發(fā)表于 04-10 11:59 ?853次閱讀

    深度剖析SQLGrouping Sets語句1

    SQL `Group By` 語句大家都很熟悉, **根據(jù)指定的規(guī)則對數(shù)據(jù)進行分組** ,常常和**聚合函數(shù)**一起使用。
    的頭像 發(fā)表于 05-10 17:44 ?610次閱讀
    <b class='flag-5'>深度</b><b class='flag-5'>剖析</b><b class='flag-5'>SQL</b><b class='flag-5'>中</b>的<b class='flag-5'>Grouping</b> <b class='flag-5'>Sets</b><b class='flag-5'>語句</b>1

    sql查詢語句大全及實例

    SQL(Structured Query Language)是一種專門用于數(shù)據(jù)庫管理系統(tǒng)的標(biāo)準(zhǔn)交互式數(shù)據(jù)庫查詢語言。它被廣泛應(yīng)用于數(shù)據(jù)庫管理和數(shù)據(jù)操作領(lǐng)域。在本文中,我們將為您詳細(xì)介紹SQL查詢語句
    的頭像 發(fā)表于 11-17 15:06 ?1161次閱讀

    oracle執(zhí)行sql查詢語句的步驟是什么

    Oracle數(shù)據(jù)庫是一種常用的關(guān)系型數(shù)據(jù)庫管理系統(tǒng),具有強大的SQL查詢功能。Oracle執(zhí)行SQL查詢語句的步驟包括編寫SQL語句、解析
    的頭像 發(fā)表于 12-06 10:49 ?775次閱讀