1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
| // Read metadata from MySQL, the metadata is to be joined with Kafka real time message
// 从 MySQL 读的数据会缓慢变化,比如隔了几天新增一些数据
val df_meta = spark.read
.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "v_entity_ap_rel")
.load()
val df_cell = spark.read
.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "tmac_org_cat")
.load()
.filter($"Category" === 1)
.select("mac3", "Category", "Brand")
df_meta.cache()
df_cell.cache()
// Read Kafka and join with df_meta to get expect result
// Kafka 里面是 Mac 地址和采集的相关数据,每行 Kafka 的消息会根上面两个 cache
// 的 DataFrame join 到一起形成最终需要的结果,问题就是 struct streaming
// 一直在运行,而读 MySQL 是一次性的,除非重新启动 Spark
// 程序,如何能够隔一段时间 reload MySQL 里面的数据?
val df = spark.readStream.
.format("kafka")
.option("kafka.bootstrap.servers", "namenode:9092")
.option("fetch.message.max.bytes", "50000000")
.option("kafka.max.partition.fetch.bytes", "50000000")
.option("subscribe", "rawdb.raw_data")
.option("failOnDataLoss", true)
.option("startingOffsets", "latest")
.load()
.select($"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData]
.selectExpr("apMacAddress APMAC", "rssi RRSSI", "sourceMacAddress CLIENTMAC", "time STIME", "updatingTime")
.filter($"stime".lt(current_timestamp()))
.filter($"stime".gt(from_unixtime(unix_timestamp(current_timestamp()).minus(5 * 60))))
.as("d")
.join(df_cell.as("c"), substring($"d.CLIENTMAC", 1, 6) === $"c.mac3")
.as("a")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac")
val query = df
.selectExpr(
"ENTITYID",
"CLIENTMAC",
"STIME",
"CASE WHEN a.rrssi >= b.rssi THEN '1' WHEN a.rrssi < b.nearbyrssi THEN '3' ELSE '2' END FLAG",
"substring(stime, 1, 10) DATE",
"substring(stime, 12, 2) HOUR")
.repartition(col("DATE"), col("HOUR"))
.writeStream
.format("csv")
.option("header",false)
.partitionBy("DATE", "HOUR")
.option("checkpointLocation", "/user/root/t_cf_table_chpt")
.trigger(ProcessingTime("5 minutes"))
.start("T_CF_TABLE")
.awaitTermination()
|