需求:根据 jsp 文件的名字,将各自的访问日志放入到不同的分区文件中,如下:
- 生成的分区文件
- 例如:part-00000 文件中的内容:只包含了 java.jsp 的访问日志
日志内容:
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/body.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
代码:
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.Array
import scala.collection.mutable
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.Partitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object Demo1 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Demo1").setMaster("local")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().getOrCreate()
val regex:String = "\"(.*?)\""
val blank:String = " (.*?) "
val pattern:Pattern = Pattern.compile(regex)
val patternBlank:Pattern = Pattern.compile(blank)
val logRDD = sc.textFile(args(0))
val resultRDD = logRDD.map(x=>{
val matcher:Matcher = pattern.matcher(x)
var result:(String, String) = ("", "")
if (matcher.find()) {
// "GET /MyDemoWeb/ HTTP/1.1"
val resource = matcher.group()
val matcherBlank:Matcher = patternBlank.matcher(resource)
if (matcherBlank.find()) {
val uri = matcherBlank.group()
// 获取页面名称
val page = uri.substring(uri.lastIndexOf("/") + 1)
if (page != null && page.trim().length() != 0) {
result = (page, x)
}
}
}
result
})
val combRDD = resultRDD.filter(x=>x._1 != "")
val partRDD = combRDD.map(x=>x._1).distinct.collect
val rRDD = combRDD.partitionBy(new MyPartitioner(partRDD))
rRDD.saveAsTextFile(args(1))
sc.stop()
}
}
// 定义分区器
class MyPartitioner(allJSPNames:Array[String]) extends Partitioner {
// 定义Map集合保存分区条件
val partitionMap = new mutable.HashMap[String, Int]()
var partID = 0
for (name <- allJSPNames) {
partitionMap.put(name, partID)
partID += 1
}
// 获取分区号
override def getPartition(key: Any): Int = partitionMap.getOrElse(key.toString, 0)
// 分区总数
override def numPartitions: Int = partitionMap.size
}
提交到 Spark 集群上运行:spark-submit --class xxx.Demo1 --master spark://qujianlei:7077 xxx.jar ~/inputpath ~/outputpath