气象站行业新闻

新闻中心

Service support

spark分析某国气象站平均气温实战

一、数据集分析

数据文件按照气象站和日期进行组织,每个气象站都是一个总目录,而且每个气象站下面从 1980 年到 2010 年,每一年又都作为一个子目录。 因为某国有成千上万个气象站,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理,将每个气象站的数据文件拼接成一个单独的文件。 预处理过的数据文件示例如下所示:

30yr_03103.dat

30yr_03812.dat

30yr_03813.dat

30yr_03816.dat

30yr_03820.dat

30yr_03822.dat

30yr_03856.dat

30yr_03860.dat

30yr_03870.dat

30yr_03872.dat

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

其中03103、03812、03813、03816、03820、03822、03856、03860、03870、03872代表的是气象站编号。

这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。

数据 含义 所占位数

1998 #year 4

03 #month 3

09 #day 3

17 #hour 3

11 #temperature 6

-100 #dew 6

10237 #pressure 6

60 #wind_direction 6

72 #wind_speed 6

0 #sky_condition 6

0 #rain_1h 6

-9999 #rain_6h 6

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

真实数据如下所示:

2010 09 08 21 200 83 10091 230 72 -9999 0 -9999

  • 1
  • 2

数据集下载:请点击这里.

二、将数据导入到HDFS上

通过HDFS Java API封装了一个本地上传文件到HDFS的工具类HDFSUploadFileUtil,目前支持将本地目录下的所有文件上传到HDFS的某个目录,也可以通过正则表达式去过滤本地目录的文件,将不需要的文件过滤掉。实现代码如下:

private static FileSystem fileSystem = null;

private static FileSystem localFileSystem = null;

public static void uploadFile(String srcDirectory, String tagDirectory, String hdfsUrl, String regex) throws URISyntaxException, IOException {

Configuration conf = new Configuration();

URI uri = new URI(hdfsUrl);

if (fileSystem == null) {

fileSystem = FileSystem.get(uri, conf);

}

if (localFileSystem == null) {

localFileSystem = FileSystem.getLocal(conf);

}

Path path = new Path(hdfsUrl + tagDirectory);

if (fileSystem.exists(path)) {

if (fileSystem.listStatus(path).length > 0) {

fileSystem.close();

throw new IllegalArgumentException("目标目录存在其他文件!");

}

} else {

fileSystem.mkdirs(path);

}

FileStatus[] listStatus = null;

if (regex.length() > 0) {

listStatus = localFileSystem.globStatus(new Path(srcDirectory), new RegexAcceptPathFilter(regex));

} else {

listStatus = localFileSystem.globStatus(new Path(srcDirectory));

}

Path[] sources = FileUtil.stat2Paths(listStatus);

for (Path p : sources) {

fileSystem.copyFromLocalFile(p, path);

System.out.println("文件[" + p.toString() + "]上传成功!");

}

fileSystem.close();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

三、spark分析数据

我们要计算每个气象站的平均气温,那首先要得到两个东西:气象站编号、气温值大小

气温值大小:通过观察数据,我们知道截取每行数据的14~19位就可以获取到气温值

气象站编号:我们发现,在数据中是获取不到我们所需要的气象站编号,只能通过输入文件的文件名去截取,所以这里的解决方案是通过hadoop的InputSplit去获取文件名,具体代码如下:

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{

val file = inputSplit.asInstanceOf[FileSplit]

iterator.map(x => x._2 + " " + file.getPath.toString)

})

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里通过InputSplit获取到输入文件的名称之后,将文件名拼在每行数据的后面,产生新的RDD

然后就可以用截取每行数据获取我们所需要的信息,同时在这里我们可以对数据做校验,过滤掉没用的数据:

val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))

  • 1
  • 2

获取到的数据为:("03812", 22),key为气象站编号,value为气温值

最后,通过强大的combineByKey进行平均值的计算,并打印出来,也可以将输出结果保存到文件中:

val resultRDD = groupRDD.combineByKey(

(temp) => (1, temp),

(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),

(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)

).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

四、完整代码

import org.apache.hadoop.io.{LongWritable, Text}

import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}

import org.apache.spark.rdd.HadoopRDD

import org.apache.spark.{SparkConf, SparkContext}

object WeatherAverage {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("weather-average")

val sc = new SparkContext(conf)

val input = "/data/spark-example/weather/"

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{

val file = inputSplit.asInstanceOf[FileSplit]

iterator.map(x => x._2 + " " + file.getPath.toString)

})

val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))

val resultRDD = groupRDD.combineByKey(

(temp) => (1, temp),

(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),

(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)

).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)

sc.stop()

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

五、打包,提交spark任务

通过maven将写好的代码打成jar包,并上传到装有spark的服务器的某个目录上,并运行

bin/spark-submit \

--class cn.oldsix.spark.weather.WeatherAverage \

--master local \

/home/oldsix/app/app-jars/spark/spark-example-0.0.1.jar

  • 1
  • 2
  • 3
  • 4
  • 5

从控制台可以看到如下结果:

 
来源:编辑:author发布时间:2024-02-17