首页 > 其他分享 >运行mem-data-analysis-framework

运行mem-data-analysis-framework

时间:2023-05-31 10:34:49浏览次数:35  
标签:join mem framework hosts parquet input csv data dir

下载:wget https://mem:[email protected]/mem-prototype/mem-data-analysis-framework.tar.gz
docker load <mem-data-analysis-framework.tar.gz
docker run  --network="host"  -t  -d   mem-prototype:latest

 

然后访问localhost:8080即可看到代码!

 

上传数据到docker:

sudo docker cp x.txt e66d9d295e54:/tmp/
[sudo] password for kali:

root@kali:/work# ls /tmp/
hsperfdata_root  spark-events  x.txt  zeppelin-index
可以看到tmp下有数据了!

 

计算相似性,基于相似性通过KNN进行“聚类”:

%spark.pyspark
# Returns all edges between server behaviors where k versions match.
# For the remaining client hellos without matches, one of the responses has to be a timeout. 
def get_core(versions, k):
    versions = versions.select(f.col('id').alias('id_a'), f.col('versions').alias('v_a'))\
        .join(versions.select(f.col('id').alias('id_b'), f.col('versions').alias('v_b')), f.col('id_a') <= f.col('id_b'))\
        .filter(version_match(f.col('v_a'), f.col('v_b'), f.lit(k)))
    print(f'threshold: {k}, version matches: {versions.count()}')
    return(versions)


versions = df_only_fp.select(f.col('versions'),f.col('timeouts')).distinct().sort(f.col('versions').asc()).withColumn("id", f.monotonically_increasing_id())
versions.write.mode('overwrite').parquet(f'/tmp/versions.parquet')
versions = spark.read.parquet(f'/tmp/versions.parquet')

versions = versions.filter(f.col('timeouts')<=5)

for i in range(0,9):
    tmp_versions = get_core(versions, 24-i)
    edges = calculate_similarites(df_only_fp, tmp_versions)
    edges.write.mode('overwrite').parquet(f'/tmp/fp_distances_raw_{24-i}_new.parquet')
    edges = spark.read.parquet(f'/tmp/fp_distances_raw_{24-i}_new.parquet')



%spark.pyspark
window = Window.partitionBy('fp_a').orderBy(col('similarity').desc_nulls_last())

# The functions temprorarily store resutls for faster processing afterwards

def generate_knndf(fp_distances, k, file_name):
    if k != 0:
        knn_df = fp_distances.withColumn('row', f.rank().over(window)).where(col('row') <= k).drop('row')
    else:
        knn_df = fp_distances
    knn_df.write.mode('overwrite').parquet(os.path.join('/tmp/', file_name))
    return knn_df
    
def generate_simple_knndf(knn_df, file_name):
    s_knn_df = knn_df.withColumn('fp_1', f.when(col('fp_a') < col('fp_b'), col('fp_a')).otherwise(col('fp_b'))).withColumn('fp_2', f.when(col('fp_1') == col('fp_b'), col('fp_a')).otherwise(col('fp_b'))).drop('fp_a').drop('fp_b').withColumnRenamed('fp_1', 'fp_a').withColumnRenamed('fp_2', 'fp_b').distinct()
    s_knn_df.write.mode('overwrite').parquet(os.path.join('/tmp/', 's_' + file_name))
    
def generate_g_knndf(knn_df, file_name):
    gf_knn = GraphFrame(df_only_fp.select(col('fingerprint_all').alias('id')), knn_df.select(col('fp_a').alias('src'), col('fp_b').alias('dst')))
    g_knn_df = gf_knn.connectedComponents().select(col('id').alias('fingerprint_all'), 'component')
    g_knn_df.write.mode('overwrite').parquet(os.path.join('/tmp/', 'g_' + file_name))

def generate(k):
    file_name = f'{k}_knn_df.parquet'
    knn_df = generate_knndf(fp_distances_raw, k, file_name)
    knn_df = spark.read.parquet(os.path.join('/tmp/', file_name))
    generate_simple_knndf(knn_df, file_name)
    generate_g_knndf(knn_df, file_name)
    

# Read k-NN graph files
def get_knn(k):
    file_name = f'{k}_knn_df.parquet'
    if not os.path.exists(os.path.join('/tmp/', file_name)):
        raise ValueError()
    knn_df = spark.read.parquet(os.path.join('/tmp/', file_name))
    if not os.path.exists(os.path.join('/tmp/', 's_' + file_name)):
        raise ValueError()
    s_knn_df = spark.read.parquet(os.path.join('/tmp/', 's_' + file_name))
    if not os.path.exists(os.path.join('/tmp/', 's_' + file_name)):
        raise ValueError()
    g_knn_df = spark.read.parquet(os.path.join('/tmp/', 'g_' + file_name))
    component_count = g_knn_df.select('component').distinct().count()
    return knn_df, s_knn_df, g_knn_df, component_count

 

2-Analyze-FPs

输出处理

def read_parsed_scan(scan, label):
    file_dir = os.path.join(scan, "certs.parquet")
    certs = spark.read.parquet(file_dir)
    file_dir = os.path.join(scan, "fingerprints.parquet")
    fps = spark.   read.parquet(file_dir)
    file_dir = os.path.join(scan, "http.parquet")
    http = spark.read.parquet(file_dir)
    file_dir = os.path.join(scan, "tls_verbose.parquet")
    tls = spark.read.parquet(file_dir)
    
    file_dir = os.path.join(scan, "input.parquet")
    labeled_input = spark.read.parquet(file_dir)
    
    file_dir = os.path.join(scan, "hosts.parquet")
    hosts = spark.read.parquet(file_dir)
    hosts = hosts.withColumn('label', f.lit('label'))
    hosts = hosts.fillna("empty", ['server_name'])
    
    return labeled_input, hosts, tls, fps, certs, http

 输入来源:

bl_labeled_input, bl_hosts, bl_tls, bl_fps, bl_certs, bl_http = read_parsed_scan("/data/blocklist-parsed", "blocklist")
tl_labeled_input, tl_hosts, tl_tls, tl_fps, tl_certs, tl_http = read_parsed_scan("/data/toplist-parsed", "toplist")

 

input路径:

/data/blocklist-parsed

/data/toplist-parsed

默认数据的输出:

Top 5 behaviors from block lists (based on number of targets:
+--------------------+-------+--------------+---------------------+--------------------+
|     fingerprint_all|targets|Distinct Ports|Distinct IP addresses| collect_set(labels)|
+--------------------+-------+--------------+---------------------+--------------------+
|771_c030_65281.-1...|     57|             4|                   57|[[Dridex], [Emotet]]|
|771_9c_65281.-35....|     51|            10|                   49|          [[QakBot]]|
|771_c02f_65281.-1...|     38|            14|                   38|          [[Dridex]]|
|771_c02f_65281.-1...|     36|            19|                   36|          [[Dridex]]|
|769_c014_65281.-1...|     22|             2|                   22|        [[TrickBot]]|
+--------------------+-------+--------------+---------------------+--------------------+

Top 5 behaviors from top lists (based on number of targets:
+--------------------+-------+----------------+---------------------+
|     fingerprint_all|targets|Distinct Domains|Distinct IP addresses|
+--------------------+-------+----------------+---------------------+
|771_1301_51.29-43...|    272|              55|                  260|
|771_1301_43.AwQ-5...|    128|              24|                  118|
|771_1301_51.29-43...|    110|              31|                  110|
|771_1301_51.29-43...|     96|              25|                   74|
|771_1301_51.29-43...|     93|              38|                   65|
+--------------------+-------+----------------+---------------------+

 

 

上述数据来自第一步 1-Parse-Scan,里面的输入和输出我们看下:

def parse(input_dir, tmp_dir, out, toplist=False):
    # Load and save all input csv files as parquets
    with ThreadPool(processes=4) as pool:
        def process_file(t):
            file = os.path.join(input_dir, t[0])
            try:
                df_tmp = load_csv(file, schema=t[1])
                save_to_parquet(out, df_tmp, t[2], 4)
            except FileNotFoundError:
                logging.info(f'Skipping {t}')
            except Exception as err:
                logging.exception('Could not save csv', exc_info=err)
        jobs = []
        r = pool.map_async(process_file, [
            ('tls_verbose.csv', TLS_VERBOSE_DF_SCHEMA, 'tls_verbose'),
            ('http.csv', HTTP_DF_SCHEMA, 'http'),
            ('certs.csv', CERTS_DF_SCHEMA, 'certs'),
            ('labeled-input.csv', INPUT_DF_SCHEMA, 'input_tmp')
        ], chunksize=1)
        jobs.append(r)
        
        
        hosts_csv = load_csv(os.path.join(input_dir, 'hosts.csv'), schema=HOSTS_DF_SCHEMA)
        df_ip = hosts_csv.select('ip').distinct().mapInPandas(get_map_pandas_add_as(
            os.path.join(input_dir.replace('file://', ''), '..', 'pyasn.dat'),
            os.path.join(input_dir.replace('file://', ''), '..', 'pyasn.asnames.json')), ADD_AS_SCHEMA)
        hosts_csv = hosts_csv.join(df_ip, on='ip', how='left_outer')
        r = pool.apply_async(lambda: save_to_parquet(out, hosts_csv, 'hosts', 4))
        jobs.append(r)
        
        for p in jobs:
            p.get()


    # Reload for faster processing
    hosts_csv = spark.read.parquet(os.path.join(out, 'hosts.parquet'))
    input_csv = spark.read.parquet(os.path.join(out, 'input_tmp.parquet'))
    tls_csv = spark.read.parquet(os.path.join(out, 'tls_verbose.parquet'))
    
    
    # Join input for scan ids
    if not toplist:
        input_csv = input_csv.join(hosts_csv.select('id', 'server_name', 'ip','port'), (input_csv.ip.eqNullSafe(hosts_csv.ip)) & (input_csv.server_name.eqNullSafe(hosts_csv.server_name)) & (input_csv.port.eqNullSafe(hosts_csv.port)))\
            .select(input_csv.ip, input_csv.server_name, input_csv.rank, input_csv.label, input_csv.port, input_csv.list, hosts_csv.id).distinct()
    else:
        input_csv = input_csv.join(hosts_csv.select('id', 'server_name', 'ip','port'), (input_csv.server_name.eqNullSafe(hosts_csv.server_name)))\
            .select(hosts_csv.ip, input_csv.server_name, input_csv.rank, input_csv.label, hosts_csv.port, input_csv.list, hosts_csv.id).distinct()
    
    save_to_parquet(out, input_csv, 'input', 4)

    splittext = f.udf(lambda FullPath: FullPath.split('.')[0], StringType())
    
    # Compute TLS Fingerprints
    hosts_csv = hosts_csv.fillna('default', subset='client_hello').withColumn('client_hello_simple', splittext("client_hello"))
    hosts_csv= hosts_csv.drop('client_hello').withColumnRenamed('client_hello_simple', 'client_hello')
    
    pivot_src_df = hosts_csv.join(tls_csv, on='id').select('id', 'ip', 'port', 'server_name', 'client_hello', 'fingerprint')

    # Compute combined Fingerprints
    client_hellos_escaped = sorted(pivot_src_df.select('client_hello').distinct().rdd.map(lambda r: r[0]).collect())


    fingerprint_df = pivot_src_df.groupBy('ip', 'port', 'server_name').pivot('client_hello', client_hellos_escaped).agg(f.first('fingerprint'))
    

    fingerprint_df = fingerprint_df.withColumn('fingerprint_all', f.array_join(remove_status_request(f.array(*client_hellos_escaped)), delimiter='|', null_replacement='______<255'))
    
    #fingerprint_df = add_fingerprint_col(fingerprint_df, 'fingerprint_all', client_hellos)
    
    # Save FP joined
        
    save_to_parquet(out, fingerprint_df, 'fingerprints', 4, partition_columns=['ip', 'port'])

 

parse("/data/blocklist", "/tmp", "/data/blocklist-parsed")

parse("/data/toplist", "/tmp", "/data/toplist-parsed", toplist=True)

看来输入是在/data里,输出刚好是/data/blocklist-parsed和/data/toplist-parsed

 

下面是我修改 blocklist-parsed 为blocklist-parsed2后运行的结果:

/data# ls
blocklist         blocklist-parsed2  pyasn.asnames.json  toplist         toplist-parsed2
blocklist-parsed  blocklist.zip      pyasn.dat           toplist-parsed  toplist.zip

 

进入docker bash里去看看数据(322c74ee1119通过docker ps看到):

sudo docker exec -it 322c74ee1119 /bin/bash

 

我们看看blocklist里内容:

/data/blocklist# ls
blocklist.input  certs.csv  http.csv          labeled-input.csv  tls_verbose.csv
cert_chain.csv   hosts.csv  http_verbose.csv  log

 看看hosts里内容,一共5万多数据:

运行mem-data-analysis-framework_安全分析

 

 

 看看toplist里内容:

/data/toplist# ls
alexa.input     certs.csv  http.csv          labeled-input.csv  tls_verbose.csv
cert_chain.csv  hosts.csv  http_verbose.csv  log

hosts文件内容,一共6万多:

 

运行mem-data-analysis-framework_spark_02

 

 第三部分的运行效果:

运行mem-data-analysis-framework_ide_03

 

 

 

 

标签:join,mem,framework,hosts,parquet,input,csv,data,dir
From: https://blog.51cto.com/u_11908275/6384736

相关文章

  • org.springframework.security.authentication.InternalAuthenticationServiceExcepti
    添加如下配置即可1.在pom.xml添加<build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.6.RELEA......
  • 4 - Linux Memory Issues - Linux 内存问题
    LinuxMemoryIssues-Linux内存问题我的博客程序源码常见的内存问题使用C语言编程逃不掉下面的内存问题:不正确的内存访问使用未经初始化的变量界外内存访问释放后使用/返回后使用问题双重释放内存泄露未定义行为数据竞争碎片化问题内部外部在编译本......
  • AI demo framework
     importpickleimportmatplotlib.pyplotaspltfromsklearnimportdatasetsfromsklearn.model_selectionimporttrain_test_splitfromsklearn.neighborsimportKNeighborsClassifierfromsklearn.model_selectionimportcross_val_scorefromsklearnimportm......
  • 【Oracle】Check size of datafiles and tempfile tablespaces used in CDB and PDB
       --WX:DBAJOE399--setline200pages999columnnamefora10columntablespace_namefora15column"MAXSIZE(GB)"format9,999,990.00column"ALLOC(GB)"format9,999,990.00column"USED(GB)"format9,999,990.00selec......
  • 火山引擎DataLeap的Catalog系统搜索实践(一):背景与功能需求
    火山引擎DataLeap的DataCatalog系统通过汇总和组织各种元数据,解决了数据生产者梳理数据、数据消费者找数和理解数的业务场景,其中搜索是DataCatalog的主要功能之一。本文详细介绍了火山引擎DataLeap的Catalog系统搜索实践:功能的设计与实现。 DataCatalog能够帮助大公司更好......
  • memcache使用实例
    以下是一个详细的Java示例代码,用于使用Memcached进行缓存操作:首先,您需要在Java项目中添加对spymemcached库的依赖项。您可以使用Maven或Gradle等构建工具添加以下依赖项:Maven依赖项(将以下代码添加到pom.xml文件中):<dependencies><dependency><groupId>net.spy</gr......
  • nacos服务下线操作时报错:The Raft Group [naming_instance_metadata] did not find th
    【问题描述】caused:errCode:500,errMsg:dometadataoperationfailed;caused:com.alibaba.nacos.consistency.exception.ConsistencyException:TheRaftGroup[naming_instance_metadata]didnotfindtheLeadernode;caused:TheRaftGroup[naming_instance_metad......
  • 【Oracle】Resize your Oracle datafiles down to the minimum without ORA-03297
      --Innon-multitenantDBsetlinesize1000pagesize0feedbackofftrimspoolonwithhwmas(--gethighestblockidfromeachdatafiles(fromx$ktfbueaswedon'tneedalljoinsfromdba_extents)select/*+materialize*/ktfbuesegtsnts......
  • djangorestframework-simplejwt使用
    djangorestframework-simplejwt环境Python(3.7,3.8,3.9,3.10)Django(2.2,3.1,3.2,4.0)DjangoRESTFramework(3.10,3.11,3.12,3.13)安装普通安装pip3installdjangorestframework-jwt加密安装pipinstalldjangorestframework-simplejwt[crypto]#建议在......
  • 将字典转为dataframe
    将字典转为dataframe使用案例:test_dic={'subject':[],'time':[],'name':[]}test_dic['subject'].append('play')test_dic['time'].append('2023:10:10')test_dic['name'].append('jack&......