Spark整合Elastic

环境: spark 1.6, ElasticSearch 1.6.1, elasticsearch-hadoop

通过 elasticsearch-hadoop可以将spark 处理后的数据保存在Elastic上,后续数据的检查和查询非常方便。

https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-05-integrating-hadoop-and-elasticsearch-%E2%80%93-part-2-%E2%80%93-writing-and-querying

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

https://spark-packages.org/package/elastic/elasticsearch-hadoop

spark mllib训练模型后如何做在线预测

最近在想一个问题,使用spark mllib训练后的模型如何做一个在线预测的服务? 毕竟mllib只提供离线的训练和预测。想到大概有四种方法:

1. 使用spark streaming + kafka

直接使用spark streaming加载训练好的模型,然后通过从kafka上读取特征来预测数据,并将预测结果写回kafka中供客户端获取到。

2. spark + grpc

通过将spark mllib中的predict函数做成rpc service的形式,具体参考:

https://scalapb.github.io/grpc.html

3. spark + spray

通过spark mllib中的predict函数做restful的形式来预测。

4. spark + python flask

通过python调用spark mllib训练好的模型,借助flash提供接口

5. spark + python grpc

通过python grpc调用spark mllib训练好的模型,提供其它语言rpc接口

 

 

目前方法1, 4, 5是可行, 方法2,3并没有实际实现的经验,还需再探索。

spark streaming调用http get存储数据

环境:spark 1.6, 存储是一个http get的服务

在build.sbt中添加”org.apache.httpcomponents” % “httpclient” % “4.5.2”  ,记得第一个分隔符是%,而不是%%。

经过多次尝试,最终代码如下:

agg_wd_business.foreach(d => {
val httpParams = new BasicHttpParams()

HttpConnectionParams.setConnectionTimeout(httpParams, 50)
HttpConnectionParams.setSoTimeout(httpParams, 50)
val client = new DefaultHttpClient(httpParams)
val request = new HttpGet(“http://xxx.xxx.xxx.xxx:9010/rt?” + URLEncoder.encode(d, “UTF-8”))
request.addHeader(“Connection”, “close”)
try{
val response = client.execute(request)
val handler = new BasicResponseHandler()
handler.handleResponse(response).trim.toString
}catch{
case ex: SocketTimeoutException => None
case ex: Exception => None
}
})

发送一个http get请求,设置超时,设置为短连接,并不保证请求一定成功。由于生成的数据有30万左右,得调用http get这么次,而nginx搭配的服务并不能快速地响应。

spark streaming读取kafka上的protobuf格式的数据

1. 通过proto文件生成java文件夹

vi test1.proto

syntax = “proto2”;
package example;

message Hello{
required string name = 1;
required int32 id = 2;
}

生成Test1.java
protoc –java_out=pbdir test1.proto

 

2. 将Test1.java拷贝到src/main/java/example目录下

 

3. 通过spark streaming读取kafka上的pb数据
import Test1._

createKafkaStream(ssc, pb_topic, kafkaParams1).map(r => r._2).map(r => {val p = Hello.parseFrom(r.getBytes); p.getId + “\\t” + p.getName})

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

在build.sbt中添加”org.apache.httpcomponents” %% “httpclient” % “4.5.2”

编译的时候出现报错:

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

[error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

 

在stackoverflow找到一个解决方法

Change the first %% to a single %. The double character version is for fetching cross-built libraries, and yours isn’t.

去掉一个%,修改如下:

“org.apache.httpcomponents” % “httpclient” % “4.5.2”

 

spark kafka.common.ConsumerRebalanceFailedException

方法1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10

 

方法2. 在spark读取kafka的代码修改

val kafkaParams = Map(
“zookeeper.connect” -> zkQuorum,
“group.id” -> “default”,
“auto.offset.reset” -> “largest”,
“zookeeper.session.timeout.ms” -> “6000”,
“zookeeper.connection.timeout.ms” -> “6000”,
“zookeeper.sync.time.ms” -> “2000”,
“rebalance.backoff.ms” -> “10000”,
“rebalance.max.retries” -> “20”
)

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic, StorageLevel.MEMORY_ONLY_SER).map(_._2)

hadoop streaming对数据进行排序

环境:  hadoop 1.2, python

有这样的数据,两列,中间是用\\t分隔的,需要按第二列从大到小的顺序排列。

02款雅阁 0.00611111111111
04款奥德赛 0.00813131313131
06ms201 0.000866666666667
06ms201图集 0.00704678362573
06雅阁 0.0145098039216
07常服大衣 0.00915032679739
08年本田思域 0.00111111111111
1.5d弯头 0.0211538461538
1.5匹空调 0.00929292929293
1.5米衣柜设计图 0.01640625

hadoop streaming的python程序写法如下:

hadoop streaming -input datain  -output dataout -mapper cat -reducer cat -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -jobconf stream.num.map.output.key.fieds=2 -jobconf stream.map.output.field.separator=”\\t” -jobconf mapred.text.key.comparator.options=”-k2,2nr”  -jobconf mapred.reduce.tasks=1

其中map, reduce都是操作系统命令cat,org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 来自定义使用key中的部分字段做比较,stream.map.output.field.separator指定map出来的数据按\\t来分隔,stream.num.map.output.key.fieds是指map出来的数据的key/value分隔符在哪,这里将第一列和第二列都作为key部分,mapred.text.key.comparator.options指key中对比的方式, -k2, 2nr是指从第二列到第二列按数值反转排序。

 

Confluent的schema-registry的使用

git clone https://github.com/confluentinc/schema-registry.git

cd schema-registry
git checkout tags/v2.0.0
mvn clean package -DskipTests

vi config/schema-registry.properties
设置kafkastore.connection.url为zookeeper的连接地址

nohup ./bin/schema-registry-start ./config/schema-registry.properties &

查看schema-registry进程
[adadmin@s11 ~]$ jps
26995 NodeManager
74580 Kafka
61079 SchemaRegistryMain
62615 Jps
126392 Worker
26843 DataNode
118141 QuorumPeerMain

#producer 注意:输入一条数据才enter一次,退出使用ctrl + C
./bin/kafka-avro-console-producer –broker-list 10.121.93.50:9092 –topic test –property value.schema='{“type”:”record”,”name”:”myrecord”,”fields”:[{“name”:”f1″,”type”:”string”}]}’
{“f1”: “value1”}
{“f1”: “value2”}
{“f1”: “value3”}

./bin/kafka-avro-console-consumer –broker-list 10.121.93.50:9092 –topic test-avro –from-beginning

spark写入kafka问题

环境:Spark 1.6,  kafka 0.8

Failed to send producer request with correlation id java.io.IOException: Connection reset by peer kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

由于使用spark读取和写入到kafka中,出现以上问题,一直以为是参数性能调整问题,调整不同的参数。

在producer端

producerConf.put(“retry.backoff.ms”, “8000”);
producerConf.put(“message.send.max.retries”, “10”);
producerConf.put(“topic.metadata.refresh.interval.ms”, “0”);
producerConf.put(“fetch.message.max.bytes”, “5252880”)
producerConf.put(“request.required.acks”, “0”)

在broker端 server.properties

message.max.bytes=5252880
replica.fetch.max.bytes=5252880
request.timeout.ms=600000

都无法解决些问题,后来才了解到producer默认的写入的方式是同步,因此问题就是在这一个参数上

producerConf.put(“producer.type”, “async”)