Scio 解释器
原文链接 : http://zeppelin.apache.org/docs/0.7.2/interpreter/scio.html
译文链接 : https://www.apachecn.org/pages/viewpage.action?pageId=10030912
概述
Scio是Google云数据流的Scala DSL 和由Spark和Scalding启发的Apache Beam。有关更多信息,请参阅当前的wiki和API文档。
配置
名称 | 默认值 | 描述 |
---|---|---|
zeppelin.scio.argz | --runner = InProcessPipelineRunner | Scio解释者广泛的论据。文档:https://github.com/spotify/scio/wiki#options 和 https://cloud.google.com/dataflow/pipelines/specifying-exec-params |
zeppelin.scio.maxResult | 1000 | 要显示的最大SC选择结果数 |
启用Scio解释器
在笔记本中,要启用Scio解释器,请单击Gear图标并选择beam(beam.scio)。
使用Scio解释器
在段落中,用于%beam.scio
选择Scio解释器。您可以使用与香草Scala REPL和Scio REPL相同的方式。状态(如变量,导入,执行等)在所有_Scio_段落之间共享。有一个特殊的变量argz,它包含来自Scio解释器设置的参数。最简单的方法是通过标准创建Scio上下文ContextAndArgs
。
%beam.scio
val (sc, args) = ContextAndArgs(argz)
sc
以常规管道/ REPL的方式使用上下文。
示例:
%beam.scio
val (sc, args) = ContextAndArgs(argz)
sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay()
如果您关闭Scio上下文,请继续创建一个新的ContextAndArgs
。请参考Scio wiki更复杂的例子。您可以关闭Scio上下文与Scio REPL相同,并使用Zeppelin显示帮助程序同步关闭并显示结果 - 在下面阅读更多。
进展
一次只能运行一个段落。没有总体进展的概念,因此进度条将显示0
。
SCollection显示助手
Scio解释器带有显示助手,以方便与Zeppelin笔记本电脑的工作。只需使用closeAndDisplay()
上SCollection
,关闭背景和显示结果。结果数量受限于zeppelin.scio.maxResult
(默认为1000)。
支持的SCollection
类型:
- Scio键入的BigQuery
- Scala的产品(案例 classes, tuples)
- Google BigQuery的TableRow
- Apache Avro
- 所有Scala的
AnyVal
助手方法
不同的对象有不同的帮助方法。您可以轻松地显示结果SCollection
,Future[Tap]
和Tap
。
SCollection
帮手
SCollection
closeAndDisplay
对于上面列出的类型,具有Zeppelin辅助方法。使用它来同步关闭Scio上下文,一旦可用的拉和显示结果。
Future[Tap]
帮手
Future[Tap]
waitAndDisplay
对于上面列出的类型,具有Zeppelin辅助方法。使用它来同步等待结果,一旦可用拉和显示结果。
Tap
帮手
Tap
display
对于上面列出的类型,具有Zeppelin辅助方法。使用它来拉和显示结果。
示例
BigQuery示例:
%beam.scio
@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
|FROM [bigquery-samples:airline_ontime_data.flights]
|group by departure_airport
|order by 2 desc
|limit 10""".stripMargin) class Flights
val (sc, args) = ContextAndArgs(argz)
sc.bigQuerySelect(Flights.query).closeAndDisplay(Flights.schema)
BigQuery typed 示例:
%beam.scio
@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
|FROM [bigquery-samples:airline_ontime_data.flights]
|group by departure_airport
|order by 2 desc
|limit 10""".stripMargin) class Flights
val (sc, args) = ContextAndArgs(argz)
sc.typedBigQuery[Flights]().flatMap(_.no_of_delays).mean.closeAndDisplay()
Avro示例:
%beam.scio
import com.spotify.data.ExampleAvro
val (sc, args) = ContextAndArgs(argz)
sc.avroFile[ExampleAvro]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay()
具有视图模式的Avro示例:
%beam.scio
import com.spotify.data.ExampleAvro
import org.apache.avro.Schema
val (sc, args) = ContextAndArgs(argz)
val view = Schema.parse("""{"type":"record","name":"ExampleAvro","namespace":"com.spotify.data","fields":[{"name":"track","type":"string"}, {"name":"artist", "type":"string"}]}""")
sc.avroFile[EndSongCleaned]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay(view)
Google凭据
Scio Interpreter会尝试从其环境推断出您的Google Cloud凭据,它将进入帐户:
argz
解释器设置(doc)- 环境变量(
GOOGLE_APPLICATION_CREDENTIALS
) - gcloud配置
BigQuery宏凭证
目前用于宏扩展的BigQuery项目是使用Google Dataflow的DefaultProjectFactory().create() 推断的。