이번 포스팅에서는 데이터 수집기(Springboot Application)와 Elasticsearch에 인증인가 기능을 더해 우리 프로젝트를 SaaS(Software As A Service)처럼 만들어보도록 하겠습니다.
인증인가 기능 추가로 얻고자 하는 것
- 올바른 API SecretKey를 가진 요청만 수집이 가능합니다.
- API SecretKey를 통해 수집 요청 서비스를 구분하고, FileWriter에서 수집 데이터 쓰기 작업 시 디렉터리를 분리합니다.
- 앞서 분리된 디렉터리별로 Elasticsearch로 전송되는 데이터의 인덱스를 구분합니다.
- Kibana에서 서비스별 사용자 계정을 생성하고, 권한이 있는 서비스의 인덱스만을 조회할 수 있도록 합니다.
데이터 수집기 인증 인가
데이터 수집기의 인증인가는 Spring Security를 이용해 대중적으로 많이 쓰이는 방법으로 진행하고자 했습니다.
그 중 API 인증 방식은 크게 두 가지로 나눌 수 있는데, 고정키 방식과 토큰 방식입니다.
고정키 방식
- 서비스별 고정된 키를 발급해 사용하는 방식
- 사용하기는 편하나 보안성이 떨어집니다.(키가 바뀌지 않으니까요)
- 예 : NHN Cloud API, Open API 등등
토큰 방식
- 로그인 과정을 거친 뒤 JWT 토큰을 발급해 사용하는 방식
- 사용하기는 약간 불편하나 보안성은 좋습니다.
이 중에서 고정키 방식을 사용하기로 했고, 아래와 같이 인증 플로우를 설계했습니다.
- Client에서 데이터 수집 요청을 보냅니다.
- KeyAuthenticationFilter에서 X-Secret-Key 헤더 값으로 KeyStore에 키 검증을 요청합니다.
- 유효한 키일 경우 SecurityContext에 인증 정보를 세팅합니다.
- DataCollectController에서 SecurityContext에 저장된 인증 정보에서 키를 가져와 수집 데이터 저장 시 활용합니다.
Spring Security 설정
@Configuration
@EnableWebSecurity
@RequiredArgsConstructor
public class SecurityConfig extends WebSecurityConfigurerAdapter {
private final KeyAuthenticationFilter keyAuthenticationFilter;
@Override
public void configure(WebSecurity web) throws Exception {
super.configure(web);
web.ignoring().antMatchers(
"/v3/api-docs/**",
"/swagger-ui.html"
);
}
@Override
protected void configure(HttpSecurity http) throws Exception {
http
// form 로그인 비활성화
.httpBasic().disable()
// csrf 비활성화
.csrf().disable()
// STATELESS 처리
.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.and()
.authorizeRequests()
.antMatchers("/swagger-ui.html", "/swagger-ui/**").permitAll()
.anyRequest().authenticated()
.and()
.addFilterBefore(keyAuthenticationFilter, UsernamePasswordAuthenticationFilter.class)
;
}
}
먼저 로그인 기능은 사용하지 않으므로, http.httpBasic().disable()
로 form 로그인을 비활성화했습니다. 또, 데이터 수집 요청마다 헤더에 키를 보낼 것이므로 http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
로 세션도 사용하지 않도록 했습니다.
Spring Security Filter 중 UsernamePasswordAuthenticationFilter
보다 먼저 KeyAuthenticationFilter
를 추가해 인증을 처리하도록 했습니다.
KeyAuthenticationFilter
@Slf4j
@Component
@RequiredArgsConstructor
public class KeyAuthenticationFilter extends OncePerRequestFilter {
private static final String HEADER_NAME = "X-Secret-Key";
private final KeyStore keyStore;
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
if (hasKeyHeader(request)) {
String key = request.getHeader(HEADER_NAME);
if (keyStore.hasKey(key)) {
SecurityContextHolder.getContext().setAuthentication(new KeyAuthenticationToken(key, null, List.of(new SimpleGrantedAuthority("ROLE_USE"))));
} else {
log.error("유효하지 않은 키입니다. 클라이언트 IP : {}, 키 : {}", HttpRequestUtil.getRemoteIp(), key);
response.setStatus(HttpStatus.BAD_REQUEST.value());
return;
}
} else {
log.error(HEADER_NAME + " 헤더에 값이 없습니다. 클라이언트 IP : {}", HttpRequestUtil.getRemoteIp());
response.setStatus(HttpStatus.BAD_REQUEST.value());
return;
}
filterChain.doFilter(request, response);
}
private boolean hasKeyHeader(HttpServletRequest request) {
return request.getHeader(HEADER_NAME) != null && !request.getHeader(HEADER_NAME).isEmpty() && !request.getHeader(HEADER_NAME).isBlank();
}
}
X-Secret-Key 헤더의 값이 존재하는 지 검사하고, KeyStore에 해당 키가 존재하는 지 확인합니다. 유효한 키일 경우 SecurityContext에 KeyAuthenticationToken
을 생성해 저장합니다.
KeyStore
application.properties
에서 keystore.keys.*
설정 값을 읽습니다. name은 서비스명이 되고, value는 SecretKey가 됩니다.
# Secret Key Store
keystore.keys.service-a=thisissecretkey
keystore.keys.service-b=yektercessisiht
FileWriter
@Slf4j
@Component
@RequiredArgsConstructor
public class FileWriter implements Writer{
private static final String SEPARATOR = "\n";
private static final DateTimeFormatter FILE_NAME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd-HHmmss");
private static final String FILE_EXT = ".txt";
private final ObjectMapper objectMapper;
private final KeyStore keyStore;
@Value("${writer.file.root-path:/}")
private String fileRootPath;
private Map<String, Path> filePaths = new HashMap<>();
@Override
public boolean isWritable() {
return !this.filePaths.isEmpty() && this.filePaths.values().stream().allMatch(Files::exists);
}
@Override
public void write(Collection<CollectedData> dataCollection) throws IOException {
Map<String, List<CollectedData>> collectDataMapByKey = dataCollection.stream().collect(Collectors.groupingBy(CollectedData::getSecretKey));
for (Entry<String, List<CollectedData>> entry : collectDataMapByKey.entrySet()) {
String key = entry.getKey();
StringBuilder stringBuilder = new StringBuilder();
for (CollectedData collectedData : entry.getValue()) {
stringBuilder.append(objectMapper.writeValueAsString(collectedData));
stringBuilder.append(SEPARATOR);
}
String valueAsString = stringBuilder.toString();
Files.write(this.filePaths.get(keyStore.getKeyName(key)), valueAsString.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
}
/**
* 1시간 주기로 데이터를 작성할 파일을 생성합니다.
*/
@Scheduled(fixedDelay = 1000 * 60 * 60L)
public void createFileTask() throws IOException {
createFile();
}
/**
* 데이터를 작성할 파일을 생성합니다.
*
* @throws IOException
*/
private void createFile() throws IOException {
for (String keyName : keyStore.getKeys().keySet()) {
Path path = Path.of(this.fileRootPath, keyName, createFileName());
Files.createDirectories(path.getParent());
this.filePaths.put(keyName, Files.createFile(path));
log.info("파일이 생성되었습니다.");
}
}
private String createFileName() {
return ZonedDateTime.now().format(FILE_NAME_FORMATTER) + FILE_EXT;
}
}
KeyStore에 존재하는 SecretKey 별로 파일 디렉터리를 생성하고, 수집 데이터를 SecretKey로 분류해 적절한 디렉터리에 존재하는 파일에 씁니다.
Logstash Pipeline config
앞선 과정까지 진행하면, 수집된 데이터들은 서비스별로 디렉터리 단위로 분리되어 저장됩니다.
그러나 기존 Logstash 파이프라인 설정은 전체 폴더를 대상으로 고정된 인덱스를 지정해주고 있었기 때문에 변경이 필요합니다.
input {
file {
path => "/source-data/*/*.txt"
...
}
}
filter {
grok {
match => ["path", "%{GREEDYDATA}/%{DATA:folder-name}/%{WORD}.%{WORD}"]
}
...
}
output {
elasticsearch {
hosts => ["http://es01:9200","http://es02:9200","http://es03:9200"]
index => "%{folder-name}-%{+YYYY.MM.dd}"
}
}
먼저 수집 데이터 파일의 디렉터리가 한 단계 추가되었기 때문에, input 플러그인의 path 속성을 수정해줍니다.
그리고 grok filter plugin으로 path 필드 값에서 파일의 부모 폴더 명을 추출하고, output filter 에서 추출한 값을 인덱스로 지정해줍니다.
ELK Stack X Pack Security
버전 6.8.0과 7.1.0부터 Elasticsearch의 X Pack 중 보안기능(Security)이 무료로 제공되었습니다.
Elasticsearch 구축 시 최소 세 대의 서버 인스턴스가 필요하기 때문에, 서버 자원이 넉넉하지 않는 이상 같은 Elasticsearch에 여러 서비스의 데이터를 저장할 일이 있을겁니다. 그렇다면 여러 서비스의 사용자가 접속하기 때문에 Elasticsearch와 Kibana에 당연히 인증 인가가 추가되어야겠죠?
docker-compose.yml
services:
es01:
environment:
- xpack.security.enabled=true
- ELASTIC_PASSWORD=1hoon
...
es02:
environment:
- xpack.security.enabled=true
...
es03:
environment:
- xpack.security.enabled=true
...
kibana:
environment:
- ELASTIC_PASSWORD=1hoon
Elasticsearch 컨테이너에 xpack.security.enabled
속성 값을 true
로 지정해 X Pack Security를 활성화합니다.
또, Elasticsearch 컨테이너와 Kibana 컨테이너에 ELASTIC_PASSWORD
로 elastic 계정의 패스워드를 지정합니다.
Logstash Pipeline config
output {
elasticsearch {
...
user => "logstash_pipeline"
password => "logstash"
}
}
elasticsearch output plugin 에 user, password 를 입력합니다. 이 계정은 아직은 없지만, Elasticsearch 구동 후 Logstash 전용 계정으로 만들어줄겁니다.
User 및 Role 생성
http://127.0.0.1:5601
으로 접속합니다.- Username : elastic
- Password : 1hoon
- Management > Stack Management > Security > Roles 메뉴로 이동합니다.
Create role
버튼을 클릭합니다.- 아래 항목을 입력한 뒤
Create role
버튼을 클릭합니다.- Role name : logstash_writer
- Cluster privileges :
manage_index_templates
,monitor
,manage_ilm
- Index privileges
- indices :
*
- Privileges :
write
,create
,create_index
,manage
,manage_ilm
- indices :
- Management > Stack Management > Security > Users 메뉴로 이동합니다.
Create User
버튼을 클릭합니다.- 아래 항목을 입력한 뒤
Create user
버튼을 클릭합니다.- Profile
- Username : logstash_pipeline
- Password
- Password : logstash
- Privileges
- Roles :
logstash_writer
- Roles :
- Profile
데이터 수집
모든 설정과 개발이 완료되었습니다! ^ㅁ^
이제 실제로 ELK Stack과 애플리케이션을 구동하고 데이터 수집 요청을 보내 서비스별로 정상적으로 인덱스가 분리되는 지 확인해보도록 하겠습니다.
먼저 application.properties
에서 writer.file.root-path
속성을 docker-compose.yml 의 Logstash 볼륨 마운트 경로로 지정합니다.
# Writer properties
writer.file.root-path=/Users/1hoon/data-collector
Data Collector 애플리케이션과 Docker compose를 구동합니다.
http://127.0.0.1:8080/data 로 수집할 데이터를 요청합니다. 이 때, 헤더에 X-Secret-Key
를 추가하고 값은 application.properties
에서 keystore.keys.* 속성 값을 입력합니다. (아래 예에서는 thisissecretkey
)
# Secret Key Store
keystore.keys.service-a=thisissecretkey
keystore.keys.service-b=yektercessisiht
http://127.0.0.1:5601
으로 접속 후 Management > Stack Management > Kibana > Index Patterns 메뉴로 이동합니다.
- Username : elastic
- Password : 1hoon
Create index pattern
버튼을 클릭했을 때, 수집한 데이터의 서비스 명으로 시작하는 인덱스 패턴이 생성되었는 지 확인합니다.
'📦 ETC > TOY PROJECT' 카테고리의 다른 글
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 트러블슈팅 (1) - Elasticsearch (0) | 2022.03.17 |
---|---|
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 데이터 수집해서 시각화 하기 (2) (0) | 2022.01.30 |
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 데이터 수집해서 시각화 하기 (1) (0) | 2022.01.30 |
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 반 정형 데이터 수집기 만들기 (0) | 2022.01.02 |