Gnoti

빅데이터 분석 with Machine Learning

#1 overview

1985년부터 2016년까지 각 국의 기본 데이터를 토대로 자살율에 대한 몇 가지 기본적인 분석을 수행 합니다.

#2 prepare lib

분석을 위한 라이브러리를 준비합니다

#3 prepare data

#3.1 데이터셋의 내용을 확인합니다.

#3.2 Superset 통해 확인합니다.

#4 Describe data

NaN 값을 제외하고 데이터 집합의 분포의 중심 경향, 분산 및 모양을 요약하는 설명 통계를 생성합니다.

#5 데이터 집합 피처들간의 상관 관계

#6 Analysis

#6.1 성별과 연령대에 따른 자살자

성별에 따라 연령대가 다른 사람들이 자살 한 수를 보여줍니다.

여기에서 자살율이 여성과 남성 모두에서 35-54 세 연령 그룹에서 더 높으며 5-14 세 연령 그룹에서 가장 낮다는 것을 알 수 있습니다

#6.2 성별과 세대에 따른 자살자

성별에 따른 다른 세대 사람들의  자살자 수를 보여줍니다.  

자살자 수가 Boomers세대에서 더 높은 반면, Generation Z에서 가장 낮다는 것을 알 수있다.

#6.3 연도별 자살자 수의 변화 추이

연도별로 연령대가 다른 사람들의 자살 수의 변화를 보여줍니다

#6.4 연도별, 성별(남여) 자살자 수의 추이

연도별 남성과 여성 모두에 대한 자살 수의 변화를 보여줍니다

빅데이터 분석을 위한 준비 Kafka + Python 연결

Kafka + Python 연결 테스트

#1 Start zookeeper & kafka

[root@zepp zookeeper-3.4.14]# vi conf/zoo.cfg

[root@zepp zookeeper-3.4.14]# bin/zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /home/min/zookeeper-3.4.14/bin/../conf/zoo.cfg

Starting zookeeper … STARTED

 

[root@zepp kafka_2.12-2.2.0]# bin/kafka-server-start.sh config/server.properties

[2019-04-19 18:12:03,572] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2019-04-19 18:12:04,104] INFO starting (kafka.server.KafkaServer)

[2019-04-19 18:12:04,105] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2019-04-19 18:12:04,128] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)

#2 create topic

[root@zepp kafka_2.12-2.2.0]# ./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides

Created topic suicides.

[root@zepp kafka_2.12-2.2.0]# bin/kafka-topics.sh –list –zookeeper localhost

suicides

#3 python용 kafka 설치

Collecting kafka

  Downloading https://files.pythonhosted.org/packages/21/71/73286e748ac5045b6a669c2fe44b03ac4c5d3d2af9291c4c6fc76438a9a9/kafka-1.3.5-py2.py3-none-any.whl (207kB)

    100% |████████████████████████████████| 215kB 12.2MB/s

Installing collected packages: kafka

Successfully installed kafka-1.3.5

(base) [min@zepp ~]$

#4 verify message

[root@zepp kafka_2.12-2.2.0]# bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides

#5  python source

#6 Result

빅데이터 분석을 위한 준비 Kafka

Kafka는 대용량 데이터를 위한 분산형 스트리밍 플랫폼입니다.

Kafka clustring Test 수행해 봅니다.

전체Archtecture

#1 ip setting

3대의 Centos 7.3 준비

server.1=192.168.0.145:2888:3888
server.2=192.168.0.174:2888:3888
server.3=192.168.0.175:2888:3888

#2 setting zookeeper & kafka

## download zookeeper & kafka
wget http://mirror.navercorp.com/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar zxvf kafka_2.12-2.2.0.tgz mv kafka_2.12-2.2.0 kafka

configure zookeeper  (1,2,3 server)[root@kafka1 min]# vi kafka/config/zookeeper.properties

# zookeeper servers
server.1=192.168.0.145:2888:3888
server.2=192.168.0.174:2888:3888
server.3=192.168.0.175:2888:3888

### configure kafka
[root@kafka1 min]# vi kafka/config/server.properties

broker.id=1 ## id를 server별로 설정
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.0.145:2181,192.168.0.174:2181,192.168.0.175:2181
delete.topic.enable=true

#3 start zookeeper & kafka

#### start zookeeper server and kafka server

[root@kafka1 min]# kafka/bin/zookeeper-server-start.sh -daemon ./kafka/config/zookeeper.properties

[root@kafka1 min]# kafka/bin/kafka-server-start.sh -daemon ./kafka/config/server.properties

#4 Verify all brokers

[root@kafka1 min]#  ./kafka/bin/zookeeper-shell.sh 192.168.0.145:2181 ls /brokers/ids

Connecting to 192.168.0.145:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[1, 2, 3]

#5 Create topic

### Create topic

kafka/bin/kafka-topics.sh –create –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181 –partitions 3 –replication-factor 3 –topic suicides

 

### List topic

kafka/bin/kafka-topics.sh –list –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181

 

### descirbe topic

[root@kafka1 min]# kafka/bin/kafka-topics.sh –describe –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181

Topic:suicides  PartitionCount:3        ReplicationFactor:3     Configs:

        Topic: suicides Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2

        Topic: suicides Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

        Topic: suicides Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

#6 produce & consume

### produce

kafka/bin/kafka-console-producer.sh –broker-list 192.168.0.145:9092 –topic suicides

### consume

kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.0.145:9092 –topic suicides –from-beginning

머신러닝을 활용한 빅데이터 분석 #4

druid에 수집된 데이터셋을 superset에서 시각화를 통한 분석을 수행합니다.

전체 분석 flow

#1 druid 수집

머신러닝을 활용한 빅데이터 분석 #3 에서 수집된 데이터셋의 확인

## druid-kafka indexing list
http://192.168.0.166:8081/#/indexing-service

## dsql에서 데이터셋 확인

select * from suicides4kafka

#2 Druid-Superset

Druid에서 수집된 데이터셋을 Superset과 연결하고 데이터 분석을 수행합니다.

#2-1 Connect Superset-Druid

Druid Cluster와의 연결을 위한 설정을 수행합니다.

연결이 완료되면 Datasources 스캔을 통해 Druid Datasources설정 메뉴에서 데이터셋 스키마를 확인 할 수 있습니다.

#2-2 Analysis

Superset에서 간단하게 수집된 데이터셋에 대한 내용을 확인해 봅니다.

#3 Result

전체 flow

Apachi nifi -> kafka -> druid -> superset

머신러닝을 활용한 빅데이터 분석 #3

Apachi nifi에서 취득한 json 데이터를 kafka broker를 이용해 메세징 처리합니다.

이후, druid indexing처리를 이용하여 분석을 위한 데이터셋를 작성합니다.

전체 분석 flow

#1 Kafka topic 만들기

이하 명령을 실행하여 suicide4 라는 카프카 항목을 만들고 여기에 데이터를 보내십시오

## list kafka topic
bin/kafka-topics.sh –list –zookeeper localhost

## delete kafka topic
bin/kafka-topics.sh –delete –zookeeper localhost –topic suicides2

## 위의 명령으로 삭제 되지 않을 경우, zookeeper shell을 이용하여 삭제
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[suicides3, suicides4, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 3] rmr /brokers/topics/suicides3
[zk: localhost:2181(CONNECTED) 4] rmr /brokers/topics/suicides4
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets]
[zk: localhost:2181(CONNECTED) 6]

## create kafka topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides4

#2 Druid Kafka ingestion

Druid의 Kafka 인덱싱 서비스를 사용하여 #1에서 작성한 suicides4에서 메시지를 수집합니다.

vi sucides4-kafka-supervisor.json

{

  “type”: “kafka”,

  “dataSchema”: {

    “dataSource”: “suicides4”,

    “parser”: {

      “type”: “string”,

      “parseSpec”: {

        “format”: “json”,

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

        },

        “dimensionsSpec”: {     

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

         },

          “dimensions”: [“time”,”country”,”year”,”sex”,”agegroup”,”count_of_suicides”,“population”,

 “suicide_rate”,”country_year_composite_key”,”HDI_for_year”,”gdp_for_year”,”gdp_per_capita”,”generation”]

        }

      }

    },

    “metricsSpec” : [],

    “granularitySpec”: {

      “type”: “uniform”,

      “segmentGranularity”: “DAY”,

      “queryGranularity”: “NONE”,

      “rollup”: false

    }

  },

  “tuningConfig”: {

    “type”: “kafka”,

    “reportParseExceptions”: false

  },

  “ioConfig”: {

    “topic”: “suicides4”,

    “replicas”: 2,

    “taskDuration”: “PT10M”,

    “completionTimeout”: “PT20M”,

    “consumerProperties”: {

      “bootstrap.servers”: “localhost:9092”

    }

  }

}

## Enable Druid Kafka ingestion
curl -XPOST -H’Content-Type: application/json’ -d @/home/min/work/sucides4-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

#3 Result

kafka consumer 확인

./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides4

머신러닝을 활용한 빅데이터 분석 #2

Apachi nifi를 이용해서 csv파일을 로드하고 json형태의 데이타로 data transform을 구현합니다.

전체 분석 flow

#1 Dataset 개요

https://www.kaggle.com/russellyates88/suicide-rates-overview-1985-to-2016
시간,장소, 연령 등의 요소가 포함된 데이터 셋을 분석하여 자살율 증가를 예방함을 목적으로 합니다. 

데이터 파일의 구성요소
 

country, year, sex, age group, count of suicides, population, suicide rate, country-year composite key, HDI for year, gdp_for_year, gdp_per_capita, generation (based on age grouping average).

#2 Load Data with NIFI

프로세서 flow

GetFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSON -> PublishKafka

#2-1 GetFile Processor

Apache NiFi 프로세서는 데이터 흐름을 만드는 블록입니다. 모든 프로세서는 출력 흐름 파일 생성에 기여하는 각각의 기능을 가지고 있습니다. 아래 이미지에 표시된 데이터 흐름은 GetFile 프로세서를 사용하여 한 sucides cvs 파일을 가져 와서 PutFile 프로세서를 사용하여 다른 디렉터리에 저장합니다.

Input Directory, File Filter란에 수집대상 cvs파일의 Directory 및 파일명을 설정합니다.

#2-2 InferAvroSchema Processor

입력 받은 콘텐츠에서 Avro 스키마를 자동으로 생성합니다.

CSV Header Definition
 

country,year,sex,agegroup,count_of_suicides,population,suicide_rate,country_year_composite_key,HDI_for_year,gdp_for_year,gdp_per_capita,generation

#2-3 ConvertCSVToAvro Processor

Avro 스키마에 따라 CSV 파일을 Avro로 변환합니다.

#2-4 ConvertAvroToJSON Processor

Avro 레코드를 JSON 객체로 변환합니다. 이 프로세서는 Avro 필드를 JSON 필드에 직접 매핑하여 결과 JSON이 Avro 문서와 동일한 계층 구조를 갖도록합니다.

#2-5 PublishKafka Processor

Kafka Producer를 사용하여 Stream Data를 Apache Kafka에 메시지로 보냅니다. 

#3 Result

프로세서 flow

GetFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSON -> PublishKafka

머신러닝을 활용한 빅데이터 분석 #1

분석 Suicide Rates Overview 1985 to 2016 

 

Suicide Rates Data를 csv형태로 취득한 후, json변환을 거쳐 메세지큐를 통해 Druid에 저장하고 Machine Learning으로 분석 가공한 후, 시각화처리를 수행합니다.

분석에 필요한 솔루션

http://kafka.apache.org/
https://nifi.apache.org/
http://druid.io/
https://scikit-learn.org/
https://superset.incubator.apache.org/

전체 분석 flow

#1 Start Druid

./bin/supervise -c quickstart/tutorial/conf/tutorial-cluster.conf

**Druid 의 경우, zookeeper의 선행 시작이 필수.

#2 Start Kafka broker

./bin/kafka-server-start.sh config/server.properties

#3 Start Nifi

./bin/nifi.sh start