1、核心工具类
package junit;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.MultiFieldQueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.highlight.Formatter;
import org.apache.lucene.search.highlight.Fragmenter;
import org.apache.lucene.search.highlight.Highlighter;
import org.apache.lucene.search.highlight.QueryScorer;
import org.apache.lucene.search.highlight.SimpleFragmenter;
import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.junit.Test;
import org.wltea.analyzer.lucene.IKAnalyzer;
import com.ljq.entity.Person;
import com.ljq.utils.HBaseAPI;
import com.ljq.utils.LuceneUtil;
import com.ljq.utils.XMLPropertyConfig;
/**
* 模拟功能:假如从互联网上采集数据,采集到的数据实时存储到HBase数据库上,<br/>
* 现需把这些数据从HBase读取出来,并在本地创建lucene索引,<br/>方便前端通过索引进行查询,把查询取得的结果在界面上展现出来。<br/><br/>
*
* 要求该功能要7*24不间断执行,实时性要求非常高,故采用Strom实时计算框架为载体,实现上面的模拟功能。
*
* @author 林计钦
* @version 1.0 2013-6-5 下午05:36:09
*/
public class IndexCreationTest {
private static StringBuffer ids=new StringBuffer();
private static IndexWriter indexWriter = LuceneUtil.getIndexWriter();
// 计算时间评估参数
private static long _initStartTime = 0; // 获取CAS consumer被实例化时的系统时间
private static long mTimeForCommit = 60; // 经过多久时间提交
// 计数器
private static int mIndexCommitBatchNum = 100; // 批量更新索引
private static int mDocCount = 0;
/**
* 从HBase实时读取数据,并在本地创建lucene索引
*/
@Test
public void createIndexFromHBase(){
readTable("1370422328578");
}
/**
* 查询索引
* @throws IOException
*/
@Test
public void searchIndex() throws IOException{
for(int i=0;i<500;i++){
search(new String[]{"id", "name", "age", "rowKey"}, i+"");
}
// search(new String[]{"id", "name", "age", "rowKey"}, "0");
// search(new String[]{"id", "name", "age", "rowKey"}, "998");
//数据写入txt文件
BufferedWriter writer = new BufferedWriter(new FileWriter(new File("E:\\123.txt")));
writer.write(ids.toString());
writer.close();
}
/**
* 从HBase实时读取数据,并在本地创建lucene索引
*
* @param startRow 指定开始的行(时间戳)
*/
public static void readTable(String startRow) {
int i = 0;
try {
while (true) {
// [{"timestamp":"1370422360734","id":"950","name":"lin950","age":"950","row":"1370422507578"}]
System.out.println("startRow=" + startRow);
List<Map<String, String>> datas = HBaseAPI.scan("tb_stu", null, String
.valueOf(startRow), 100);
i += datas.size();
System.out.println(i);
if (datas != null && datas.size() > 0) {
for (Map<String, String> data : datas) {
String row = data.get(HBaseAPI.HBASE_ROW);
startRow = row;
// System.out.println(String.format("id:%s, name:%s,
// age:%s, rowKey:%s.",
// data.get("id"), data.get("name"), data.get("age"),
// row));
createIndex(data.get("id"), data.get("name"), data.get("age"), row);
}
}
if(isCommitTime()){
System.out.println("indexCommit");
try {
indexWriter.commit(); // 批量提交
indexWriter.forceMerge(1); // forceMerge代替optimize
System.out.println("indexWriter.commit();");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("indexCommit fin");
}
}
}catch(Exception e){
e.printStackTrace();
}
}
private static void createIndex(String id, String name, String age, String rowKey){
if (StringUtils.isBlank(id) || StringUtils.isBlank(name) || StringUtils.isBlank(age)
|| StringUtils.isBlank(rowKey)) {
System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s.",
id, name, age, rowKey));
return;
}
try {
Document doc = new Document();
doc.add(new Field("id", id, Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("name", name, Field.Store.YES, Field.Index.ANALYZED));
doc.add(new Field("age", age, Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("rowKey", rowKey, Field.Store.YES, Field.Index.NOT_ANALYZED));
//更新索引
if(LuceneUtil.existsIndex()){
System.out.println("---update index---");
indexWriter.updateDocument(new Term("id", id), doc);
}else { //第一次创建索引
System.out.println("---create index---");
indexWriter.addDocument(doc);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判断是否可以提交索引
* @return
*/
private static synchronized boolean isCommitTime(){
//每隔1分钟提交一次
if((System.currentTimeMillis() - _initStartTime) >= (mTimeForCommit*1000)){
_initStartTime = System.currentTimeMillis();
return true;
}
//累加到100条提交一次
else if(mDocCount % mIndexCommitBatchNum == 0){
_initStartTime = System.currentTimeMillis();
return true ;
}
else
return false;
}
/**
* 搜索、高亮显示
*
* @param fields
* @param keyword
*/
private void search(String[] fields, String keyword) {
IndexSearcher indexSearcher = null;
try {
// 创建索引搜索器,且只读
IndexReader indexReader = IndexReader.open(FSDirectory.open(new File(XMLPropertyConfig.getConfigXML()
.getString("index_path"))), true);
indexSearcher = new IndexSearcher(indexReader);
MultiFieldQueryParser queryParser = new MultiFieldQueryParser(Version.LUCENE_35,
fields, new IKAnalyzer());
Query query = queryParser.parse(keyword);
// 返回前number条记录
TopDocs topDocs = indexSearcher.search(query, 1000);
// 信息展示
int totalCount = topDocs.totalHits;
//System.out.println("共检索出 " + totalCount + " 条记录");
// 高亮显示
/*
* 创建高亮器,使搜索的结果高亮显示 SimpleHTMLFormatter:用来控制你要加亮的关键字的高亮方式 此类有2个构造方法
* :SimpleHTMLFormatter()默认的构造方法.加亮方式:<B>关键字</B>
* :SimpleHTMLFormatter(String preTag, String
* postTag).加亮方式:preTag关键字postTag
*/
Formatter formatter = new SimpleHTMLFormatter("<font color='red'>", "</font>");
/*
* QueryScorer QueryScorer
* 是内置的计分器。计分器的工作首先是将片段排序。QueryScorer使用的项是从用户输入的查询中得到的;
* 它会从原始输入的单词、词组和布尔查询中提取项,并且基于相应的加权因子(boost factor)给它们加权。
* 为了便于QueryScoere使用,还必须对查询的原始形式进行重写。 比如,带通配符查询、模糊查询、前缀查询以及范围查询
* 等,都被重写为BoolenaQuery中所使用的项。
* 在将Query实例传递到QueryScorer之前,可以调用Query.rewrite
* (IndexReader)方法来重写Query对象
*/
QueryScorer fragmentScorer = new QueryScorer(query);
Highlighter highlighter = new Highlighter(formatter, fragmentScorer);
Fragmenter fragmenter = new SimpleFragmenter(100);
/*
* Highlighter利用Fragmenter将原始文本分割成多个片段。
* 内置的SimpleFragmenter将原始文本分割成相同大小的片段,片段默认的大小为100个字符。这个大小是可控制的。
*/
highlighter.setTextFragmenter(fragmenter);
ScoreDoc[] scoreDocs = topDocs.scoreDocs;
for (ScoreDoc scDoc : scoreDocs) {
Document document = indexSearcher.doc(scDoc.doc);
String id = document.get("id");
String name = document.get("name");
String age = document.get("age");
String rowKey = document.get("rowKey");
float score = scDoc.score; //相似度
//高亮显示
String lighterName = highlighter.getBestFragment(new IKAnalyzer(), "name", name);
if (null == lighterName) {
lighterName = name;
}
String lighterAge = highlighter.getBestFragment(new IKAnalyzer(), "age", age);
if (null == lighterAge) {
lighterAge = age;
}
Person person = new Person();
person.setId(NumberUtils.toLong(id));
person.setName(lighterName);
person.setAge(NumberUtils.toInt(age));
ids.append(id).append("\n\r");
System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s, 相似度:%s.",
id, lighterName, lighterAge, rowKey, score));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
indexSearcher.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2、LuceneUtil类->Lucene工具类
/**
* lucene工具类,采用IKAnalyzer中文分词器
*
* @author 林计钦
* @version 1.0 2013-6-3 下午03:51:29
*/
public class LuceneUtil {
/** 索引库路径 */
private static final String indexPath = XMLPropertyConfig.getConfigXML()
.getString("index_path");
private static final Logger log=Logger.getLogger(LuceneUtil.class);
public static IndexWriter getIndexWriter(){
try {
//索引库路径不存在则新建一个
File indexFile=new File(indexPath);
if(!indexFile.exists()) indexFile.mkdir();
Directory fsDirectory = FSDirectory.open(indexFile);
IndexWriterConfig confIndex = new IndexWriterConfig(Version.LUCENE_35, new IKAnalyzer());
confIndex.setOpenMode(OpenMode.CREATE_OR_APPEND);
if (IndexWriter.isLocked(fsDirectory)) {
IndexWriter.unlock(fsDirectory);
}
return new IndexWriter(fsDirectory, confIndex);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 判断索引库是否已创建
*
* @return true:存在,false:不存在
* @throws Exception
*/
public static boolean existsIndex() throws Exception {
File file = new File(indexPath);
if (!file.exists()) {
file.mkdirs();
}
String indexSufix = "/segments.gen";
// 根据索引文件segments.gen是否存在判断是否是第一次创建索引
File indexFile = new File(indexPath + indexSufix);
return indexFile.exists();
}
}
3、HBaseAPI类->HBase数据库封装类
/**
* HBase数据库封装类
*
* @author 林计钦
* @version 1.0 2013-6-4 上午11:02:17
*/
public class HBaseAPI {
/**主键*/
public static String HBASE_ROW = "row";
/**列镞*/
public static String HBASE_FAMILY = "family";
/**列名*/
public static String HBASE_QUALIFIER = "qualifier";
/**列值*/
public static String HBASE_QUALIFIERVALUE = "qualifiervalue";
/**时间戳*/
public static String HBASE_TIMESTAMP = "timestamp";
/** 访问HBase线程池大小 */
public static int poolSize = 1000;
public static Configuration conf;
private static HTablePool tablePool = null;
private static final Logger log=Logger.getLogger(HBaseAPI.class);
static {
//来自$HBase/conf/hbase-site.xml配置文件
conf = new Configuration();
conf.set("hbase.master.port", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_master_port"));
conf.set("hbase.zookeeper.quorum", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_quorum"));
conf.set("hbase.zookeeper.property.clientPort", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_property_clientPort"));
}
/**
* HTablePool对HBase表进行CRUD操作,不推荐用HTable对HBase表进行CRUD操作。<br/><br/>
*
* HTablePool可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象。
*
* @return
*/
public static HTablePool getHTablePool() {
if (tablePool == null) {
tablePool = new HTablePool(conf, poolSize);
}
return tablePool;
}
/**
* 从startRow开始查询,查询maxCount条记录
*
* @param tableName 表名
* @param startRow 指定开始的行(时间戳)
* @param maxCount 从startRow开始查询,查询maxCount条记录,最高阀值为10000
* @return [{"timestamp":"1370412537880","id":"1","name":"zhangsan","age":"20","row":"quanzhou"}]
*/
public static List<Map<String, String>> scan(String tableName, FilterList filterList, String startRow, int maxCount) {
List<Map<String, String>> datas = new ArrayList<Map<String, String>>();
ResultScanner rs = null;
try {
HTable table = (HTable) getHTablePool().getTable(tableName);
Scan scan = new Scan();
if(filterList!=null){
scan.setFilter(filterList);
}
if (startRow != null && !"".equals(startRow.trim())) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if(maxCount<=0){
maxCount = 10000;
}
if (maxCount > 10000) {
maxCount = 10000;
}
scan.setCaching(maxCount + 1);
rs = table.getScanner(scan);
//Result类提供了raw()、list()、getValue(byte[] family, byte[] qualifier)三种方法来遍历数据
for (Result r : rs) {
HashMap<String, String> map = new HashMap<String, String>();
long timestamp = 0;
for (KeyValue kv : r.list()) {
timestamp = kv.getTimestamp();
String qualifier = Bytes.toString(kv.getQualifier()); //列名
String value = Bytes.toString(kv.getValue()); //列值
map.put(qualifier, value);
}
map.put(HBASE_ROW, Bytes.toString(r.getRow()));
map.put(HBASE_TIMESTAMP, "" + timestamp);
datas.add(map);
// 假如到了指定条数就跳出
if (datas.size() >= maxCount) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
if(rs!=null){
rs.close();
}
//table.close();
}
return datas;
}
}
标签:lucene,String,org,Lucene,static,new,import,HBase
From: https://blog.51cto.com/u_2650279/6154911