ELK-kibana查询语法

kibana使用的lucene查询语法

全文搜索
在搜索栏输入login,会返回所有字段值中包含login的文档
使用双引号包起来作为一个短语搜索 “like Gecko”

字段
可以按页面左侧显示的字段搜索
限定字段全文搜索:field:value 例如:content:elapsed time
精确搜索:关键字加上双引号 filed:”value” 例如:http.code:404 搜索http状态码为404的文档

字段本身是否存在
_exists_:http:返回结果中需要有http字段
_missing_:http:不能含有http字段

通配符
? 匹配单个字符
* 匹配0到多个字符
kiba?a, el*search
? * 不能用作第一个字符,例如:?text *text

正则
es支持部分正则功能
mesg:/mes{2}ages?/

模糊搜索
~:在一个单词后面加上~启用模糊搜索
first~ 也能匹配到 frist
还可以指定需要多少相似度

cromm~0.3 会匹配到 from 和 chrome
数值范围0.0 ~ 1.0,默认0.5,越大越接近搜索的原始值

近似搜索
在短语后面加上~
“select where”~3 表示 select 和 where 中间隔着3个单词以内

范围搜索
数值和时间类型的字段可以对某一范围进行查询
length:[100 TO 200]
date:{“now-6h” TO “now”}
[ ] 表示端点数值包含在范围内,{ } 表示端点数值不包含在范围内

逻辑操作
AND OR,举例为:serverId:NCC AND callerId:iOS +”/c/app/house/search” +”elapsed time”
+:搜索结果中必须包含此项
-:不能含有此项
+apache -jakarta test:结果中必须存在apache,不能有jakarta,test可有可无

分组
(jakarta OR apache) AND jakarta

字段分组
title:(+return +”pink panther”)

转义特殊字符
+ – && || ! () {} [] ^” ~ * ? : \
以上字符当作值搜索的时候需要用\转义

编程思想读书笔记

一些读书demo例子,目录为:

1.初始化过程
2.string一些demo
3.Callable使用
4.CyclicBarrier使用
5.volatile使用
6.CountDownLatch使用

使用demo例子为:
1.初始化过程

package com.learn.day1;

public class LearnDemo01 {

    /**
     * static{} >> {} >> DemoClass(){}构造块
     * 
     * @author XiusongHU
     */
    public static class DemoClass {
        private static final String KEY_PREFIX = "hello_";// final static静态常量,不可变
        
        static {// 静态块
            System.out.println("static静态块被调用了");
        }
        
        {// 方法块
            System.out.println("{}方法块被调用了");
        }
        
        public DemoClass() {// 构造方法
            System.out.println("构造方法被调用了");
        }
        
        public void sayHello() {// 普通成员方法
            System.out.println("sayHello方法被调用了, key_prefix为:" + KEY_PREFIX);
        }
    }

    public static void main(String[] args) {
        DemoClass demo = new DemoClass();
        demo.sayHello();
        DemoClass demo2 = new DemoClass();
        demo2.sayHello();
    }

}

2.string一些demo

package com.learn.day1;

public class LearnDemo02 {

    /**
     * String一些demo例子,方法区里边的常量池
     * 
     * @param args
     */
    public static void main(String[] args) {
        String str1 = new String("hello world");
        String str2 = new String("hello world");
        String str3 = "hello world";
        String str4 = "hello world".intern();
        String str5 = new StringBuilder("hello world").toString();
        String str6 = new StringBuilder("你好,").append("中国").toString();// 若注释掉str7,则在调用str6.intern()时候“你好,中国”满足首次出现
        String str7 = new StringBuilder("你好,中国").toString();

        System.out.println("str1==str2?" + (str1 == str2));// false
        System.out.println("str1==str3?" + (str1 == str3));// false
        System.out.println("str3==str4?" + (str3 == str4));// true
        System.out.println("str5.intern()==str5?" + (str5.intern() == str5));// false
        System.out.println("str6.intern()==str6?" + (str6.intern() == str6));// false,若注释掉str7则此处应该返回true,满足首次出现
        System.out.println("str7.intern()==str7?" + (str7.intern() == str7));// false
    }

}

3.Callable使用

package com.learn.day1;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class LearnDemo03 {

    /**
     * 定义模拟result返回的实体类
     * 
     * @author XiusongHU
     */
    public static final class DemoClass {
        private String name;
        private int age;

        public DemoClass() {
        }

        public DemoClass(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        @Override
        public String toString() {
            return "name:" + name + ",age:" + age;
        }
    }

    public static void main(String[] args) throws Exception {
        // 定义并实例化固定线程数的线程池
        int nThreads = 5;
        ExecutorService service = Executors.newFixedThreadPool(nThreads);

        // Callable<T>任务是有返回值,future对象
        Future<DemoClass> future = service.submit(new Callable<DemoClass>() {

            @Override
            public DemoClass call() throws Exception {
                System.out.println("Callable.call()方法被调用了");

                // pool-1-thread-1
                System.out.println("当前回调所在线程为:" + Thread.currentThread().getName());

                DemoClass result = new DemoClass();
                result.setName("hello world");
                result.setAge(23);

                return result;
            }

        });

        // false
        System.out.println("future.isDone?" + future.isDone());
        // false
        System.out.println("future.isCancel?" + future.isCancelled());

        // 第一次调用get()获取返回数据对象
        DemoClass result = future.get();
        System.out.println(result);

        // 第二次获取返回数据对象,依旧有返回值
        DemoClass result2 = future.get();
        System.out.println(result2);

        // true
        System.out.println("future.isDone?" + future.isDone());
        // false
        System.out.println("future.isCancel?" + future.isCancelled());

        // 销毁线程池
        // service.shutdownNow();

        // 销毁线程池,等待任务都调度完毕了才销毁处理
        service.shutdown();
    }
}

4.CyclicBarrier使用

package com.learn.day1;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * CyclicBarrier循环栅栏,表示多个线程共同等待一个公共的屏障,全部到达屏障后,然后继续一起往下执行
 * 
 * @author XiusongHU
 */
public class LearnDemo04 {

    // 全局计数器:原子性+可见性
    public static volatile AtomicInteger GLOBAL_COUNTER = new AtomicInteger();

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        // 10名运动员,若此处定义小于10,则程序无法往下执行了
        ExecutorService service = Executors.newFixedThreadPool(10);

        // 10名运动员参与比赛,因此设定为10
        CyclicBarrier barrier = new CyclicBarrier(10);

        // 运动员准备处理
        for (int i = 1; i <= 10; i++) {
            service.execute(new RunningTask(String.format("运动员:%d号", i), barrier));
        }

        // 若全局没有达到10,则表示运动员没有全部就位
        while (GLOBAL_COUNTER.get() < 10) {
            // 这边不做任何处理
        }
        System.out.println("10名运动员准备耗时为:" + (System.currentTimeMillis() - startTime) + "ms");

        // 销毁线程池
        service.shutdown();
    }

    public static class RunningTask implements Runnable {

        // 运动员名称
        private String runnerName;
        // 公共屏障,待全部线程到达该屏障后,释放继续往下执行
        private CyclicBarrier singnal;

        public RunningTask(String name, CyclicBarrier singnal) {
            this.runnerName = name;
            this.singnal = singnal;
        }

        @Override
        public void run() {
            try {
                // 模拟准备,休眠0-10秒时间
                Random random = new Random();
                Thread.sleep(random.nextInt(10000));

                System.out.println(String.format("运动员:%s准备完毕,等待发令", runnerName));
                try {
                    // 等待其它运动员准备好了才继续往下走
                    int count = singnal.await();

                    System.out.println(String.format("当前线程:%s,运动员:%s,count计数为:%d", Thread.currentThread().getName(), runnerName, count));
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 采用volatile+CAS方式处理计数
                GLOBAL_COUNTER.incrementAndGet();

                System.out.println(String.format("运动员:%s起跑了", runnerName));
            }
        }

        public String getRunnerName() {
            return runnerName;
        }

        public void setRunnerName(String runnerName) {
            this.runnerName = runnerName;
        }

        public CyclicBarrier getSingnal() {
            return singnal;
        }

        public void setSingnal(CyclicBarrier singnal) {
            this.singnal = singnal;
        }

    }

}

5.volatile使用

package com.learn.day1;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程环境下模拟计数器,利用到了多线程两个特性:原子性+可见性
 * 
 * @author XiusongHU
 */
public class LearnDemo05 {

    // 全局计数器对象
    private static volatile AtomicInteger GLOBAL_COUNTER = new AtomicInteger();

    public static void main(String[] args) {
        // 创建出200个线程,模拟计数程序
        for (int i = 0; i < 200; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(new Runnable() {

                @Override
                public void run() {
                    // volatile+CAS组合保证多线程环境下计数的准确性
                    int nowCounter = GLOBAL_COUNTER.incrementAndGet();
                    System.out.println("当前线程为:" + Thread.currentThread().getName() + ",当前计数为:" + nowCounter);
                }

            }, "Thread_" + (i + 1)).start();
        }

        // 若计算未达到200,表示分支线程都还没处理完毕,主线程在此处循环等待
        while (GLOBAL_COUNTER.get() < 200) {
        }
        System.out.println("计数完成,总数为:" + GLOBAL_COUNTER.get());
    }

}

6.CountDownLatch使用

package com.learn.day1;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 通过发令枪响>>运动员起跑>>最后一个到达终点场景模拟使用CountDownLatch
 * 
 * @author XiusongHU
 */
public class LearnDemo06 {

    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(10);

        // 发令启动锁,此处倒计时1由main主线程来释放
        final CountDownLatch beginLatch = new CountDownLatch(1);

        // 全部到达终点倒计时锁
        final CountDownLatch allEndLatch = new CountDownLatch(10);

        System.out.println("通过百米10人赛跑模拟如何使用CountDownLatch类");

        // 创建10个线程模拟赛跑中的10个运动员
        for (int i = 1; i <= 10; i++) {
            final String runnerName = String.format("运动员%d号", i);
            service.execute(new Runnable() {

                @Override
                public void run() {
                    long startTime = System.currentTimeMillis();
                    System.out.println("当前线程为:" + Thread.currentThread().getName() + ",运动员:" + runnerName + "准备完毕,等待发枪");

                    try {
                        // 等待发令员发出命令,未发出令之前,运动员都在此处等待
                        beginLatch.await();

                        // 发令完毕,模拟跑步过程中,15秒内完成
                        Thread.sleep(new Random().nextInt(15000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println("等待发令员beginLatch.await()发生异常," + e);
                    } finally {
                        // 跑步完成后,需要将计数减少1,表示完成了一个运动员
                        allEndLatch.countDown();
                        System.out.println("当前线程为:" + Thread.currentThread().getName() + ",运动员:" + runnerName + "完成了跑步,耗时为:"
                                + (System.currentTimeMillis() - startTime) / 1000 + "s");
                    }
                }

            });
        }

        // 主线程休眠100毫秒,停顿一会
        Thread.sleep(100);

        // 发令员下发起跑命令
        System.out.println("发令员准备就绪,准备发枪");
        beginLatch.countDown();
        long startTime = System.currentTimeMillis();

        // 终点裁判等待,10个运动员都到达终点才结束计时
        allEndLatch.await();

        System.out.println("终点计时裁判计时完毕,最后一个通过耗时为:" + (System.currentTimeMillis() - startTime) / 1000 + "s");

        // 关闭线程池资源
        service.shutdown();
    }

}

Map源码学习

本文目录:
1.map接口方法汇总
2.hashmap使用demo
3.hashmap方法源码解读

全文内容:
1.map接口方法汇总

public interface Map<K,V> {
	// key-value键值对长度size
	int size();
	
	// map容器是否为空,若为空返回true
	boolean isEmpty();
	
	// map容器是否含key,若有返回true
	boolean containsKey(Object key);
	
	// map容器是否含value,若有返回true
	boolean containsValue(Object value);
	
	// 从map容器获取key对应的value对象
	V get(Object key);
	
	// 往map容器设置key-value键值对数据
	V put(K key, V value);
	
	// 从map容器移除掉key键值对数据
	V remove(Object key);
	
	// 批量put接口,往map里边放置key-values键值堆数据
	void putAll(Map<? extends K, ? extends V> m);
	
	// 清理掉map容器里边的所有key-value键值对数据
	void clear();
	
	// 返回map容器key键集合数据
	Set<K> keySet();
	
	// 返回map容器value值集合数据列表
	Collection<V> values();
	
	// 返回map容器key-value键值对entry集合数据
	Set<Map.Entry<K, V>> entrySet();
	
	// 参考Object对象的equals方法
	boolean equals(Object o);
	
	// 参考Object对象的hashCode方法
	int hashCode();
	
	// Map内部的Entry<K,V>接口,特别重要的接口
	interface Entry<K,V> {
		// 获取entry对象的key键数据
		K getKey();
		
		// 获取entry对象的value值数据
		V getValue();
		
		// 设置entry对象的value值数据,相当于是替换覆盖value数据
		V setValue(V value);
		
		// 参考Object对象的equals方法
		boolean equals(Object o);
		
		// 参考Object对象的hashCode方法
		int hashCode();
	}
}

2.hashmap使用demo

package com.learn.day1;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * HashMap使用demo例子
 * 
 * @author XiusongHU
 */
public class HashMapDemo {

    public static void main(String[] args) {
        // 定义并实例化key对象
        KeyObject key1 = new KeyObject("key1", 1);
        KeyObject key2 = new KeyObject("key2", 2);
        KeyObject key3 = new KeyObject("key3", 3);

        // 定义并实例化value对象
        ValueObject v1 = new ValueObject("v1", 1, true);
        ValueObject v2 = new ValueObject("v2", 2, false);
        ValueObject v3 = new ValueObject("v3", 3, true);

        Map<KeyObject, ValueObject> map = new HashMap<KeyObject, ValueObject>();
        Map<KeyObject, ValueObject> childMap = new HashMap<KeyObject, ValueObject>();

        // 调用put方法
        map.put(key1, v1);
        childMap.put(key2, v2);
        childMap.put(key3, v3);

        // 调用putAll方法
        map.putAll(childMap);

        // 调用size方法
        System.out.println("map.size为:" + map.size());
        // 调用isEmpty方法
        System.out.println("map.isEmpty为:" + map.isEmpty());

        // 调用containsKey方法
        System.out.println("map.containsKey(k1)为:" + map.containsKey(key1));
        // 调用containsValue方法
        System.out.println("map.containsValue(v2)为:" + map.containsValue(v2));

        // 调用get方法
        System.out.println("map.get(k3)为:" + map.get(key3));

        // 调用keySet方法返回map里边所有键数据集合
        Set<KeyObject> keys = map.keySet();
        for (KeyObject key : keys) {
            System.out.println("keyDetail为:" + key.toString());
        }

        // 调用values方法返回map里边所有值数据列表
        Collection<ValueObject> values = map.values();
        for (ValueObject value : values) {
            System.out.println("valueDetail为:" + value.toString());
        }

        // 调用entrySet方法返回键值对数据集合
        Set<Map.Entry<KeyObject, ValueObject>> entrys = map.entrySet();
        for (Map.Entry<KeyObject, ValueObject> entry : entrys) {
            System.out.println("entry.key为:" + entry.getKey());
            System.out.println("entry.value为:" + entry.getValue());
        }

        // 调用remove方法移除掉元素
        map.remove(key1);
        System.out.println("移除掉k1之后,map.size为:" + map.size());

        // 调用clear方法清理掉map
        map.clear();
        System.out.println("map.size为:" + map.size());
        System.out.println("map.isEmpty为:" + map.isEmpty());
    }

    /**
     * 定义map接口的key对象,覆写equals和hashCode方法
     * 
     * @author XiusongHU
     */
    public static class KeyObject {
        // 名称,模拟成员属性
        private String name;
        // 数量,模拟成员属性
        private int num;

        public KeyObject(String name, int num) {
            this.name = name;
            this.num = num;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getNum() {
            return num;
        }

        public void setNum(int num) {
            this.num = num;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((name == null) ? 0 : name.hashCode());
            result = prime * result + num;
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            KeyObject other = (KeyObject) obj;
            if (name == null) {
                if (other.name != null)
                    return false;
            } else if (!name.equals(other.name))
                return false;
            if (num != other.num)
                return false;
            return true;
        }

        @Override
        public String toString() {
            return "KeyObject [name=" + name + ", num=" + num + "]";
        }
    }

    /**
     * 定义map接口的value值对象
     * 
     * @author XiusongHU
     */
    public static class ValueObject {
        // 用户名
        private String name;
        // 年龄
        private int age;
        // 是否男性:若为男性则为true,否则为false
        private boolean male;

        public ValueObject(String name, int age, boolean male) {
            this.name = name;
            this.age = age;
            this.male = male;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        public boolean isMale() {
            return male;
        }

        public void setMale(boolean male) {
            this.male = male;
        }

        @Override
        public String toString() {
            return "ValueObject [name=" + name + ", age=" + age + ", male=" + male + "]";
        }
    }

}

执行上述程序,输出结果为:

map.size为:3
map.isEmpty为:false
map.containsKey(k1)为:true
map.containsValue(v2)为:true
map.get(k3)为:ValueObject [name=v3, age=3, male=true]
keyDetail为:KeyObject [name=key2, num=2]
keyDetail为:KeyObject [name=key1, num=1]
keyDetail为:KeyObject [name=key3, num=3]
valueDetail为:ValueObject [name=v2, age=2, male=false]
valueDetail为:ValueObject [name=v1, age=1, male=true]
valueDetail为:ValueObject [name=v3, age=3, male=true]
entry.key为:KeyObject [name=key2, num=2]
entry.value为:ValueObject [name=v2, age=2, male=false]
entry.key为:KeyObject [name=key1, num=1]
entry.value为:ValueObject [name=v1, age=1, male=true]
entry.key为:KeyObject [name=key3, num=3]
entry.value为:ValueObject [name=v3, age=3, male=true]
移除掉k1之后,map.size为:2
map.size为:0
map.isEmpty为:true

3.hashmap方法源码解读
HashMap类定义为:

public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable{
	// 1.继承AbstractMap抽象类
	// 2.实现三个接口,分别为:Map<K,V>、Cloneable、Serializable
}

HashMap最重要的两个方法,一个是put另外一个是get方法,源码分析为:
1.无参数的构造方法,内部使用默认参数

// 16:初始化容量大小,2的四次方
// 0.75f:默认加载因子
public HashMap() {
	this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}

2.有参数构造方法,容量和加载因子

public HashMap(int initialCapacity, float loadFactor) {
	if (initialCapacity < 0)
		throw new IllegalArgumentException("Illegal initial capacity: " +
										   initialCapacity);
	if (initialCapacity > MAXIMUM_CAPACITY)
		initialCapacity = MAXIMUM_CAPACITY;
	if (loadFactor <= 0 || Float.isNaN(loadFactor))
		throw new IllegalArgumentException("Illegal load factor: " +
										   loadFactor);
	
	// loadFactor使用了final修饰,构造方法初始化完毕后,不能再次修改loadfactor值
	this.loadFactor = loadFactor;
	// threshold设置为初始化容量大小,此处为16
	threshold = initialCapacity;
	
	// 调用init初始化方法,见3.X解释
	init();
}

3.有参数构造方法,init方法

// Initialization hook for subclasses,给子类来实现,貌似比较少看到继承HashMap然后实现这个方法的
void init() {
	// 此处暂时没有实现
}

接着,我们看map.put方法为:
1.put主方法,里边子方法,见2.X。。。以此类推

public V put(K key, V value) {
	if (table == EMPTY_TABLE) {
		// 若table为空,初始化table数组,见2.1
		inflateTable(threshold);
	}
	
	// 若key为null,调用putForNullKey方法,HashMap支持放入null作为key,见2.2
	if (key == null)
		return putForNullKey(value);
		
	// 计算key的hash值,见下边的hash方法,见2.3
	int hash = hash(key);
	
	// 计算出key放在table数组的什么index位置,见下边的indexFor方法,见2.4
	int i = indexFor(hash, table.length);
	
	// 遍历table[i]上的entry链表,判断是否存在相同的key,若存在直接覆盖,不存在走到addEntry方法
	for (Entry<K,V> e = table[i]; e != null; e = e.next) {
		Object k;
		// 判定条件为:
		// hash值相同 + key对象引用相同或key.equals方法返回true
		if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
			V oldValue = e.value;
			e.value = value;
			// 此处作用,见下边分析recordAccess方法,传入的是this对象2.6
			e.recordAccess(this);
			// 若遇到相同的,新的覆盖掉旧的,然后返回旧的value数据值
			return oldValue;
		}
	}

	modCount++;
	// 添加新节点,见2.5
	addEntry(hash, key, value, i);
	return null;
}

2.1 inflateTable方法,参数为:threshold,默认为16

 
private void inflateTable(int toSize) {
	// Find a power of 2 >= toSize,见方法2.2.1
	int capacity = roundUpToPowerOf2(toSize);

	threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
	// 此处为关键点,初始化table数组,数组大小为capacity,Entry见2.2.2
	table = new Entry[capacity];
	// 见2.2.3
	initHashSeedAsNeeded(capacity);
}
// 2.2.1 roundUpToPowerOf2方法
1
private static int roundUpToPowerOf2(int number) {
	// 返回最靠近number的2的次方的值,比如number为15,返回为16,2的四次方
	int rounded = number >= MAXIMUM_CAPACITY
			? MAXIMUM_CAPACITY
			: (rounded = Integer.highestOneBit(number)) != 0
				? (Integer.bitCount(number) > 1) ? rounded << 1 : rounded
				: 1;

	return rounded;
}
// 2.2.2 Entry为HashMap的内部类
static class Entry<K,V> implements Map.Entry<K,V> {
	// entry的key键
	final K key;
	// entry的value值
	V value;
	// 处理碰撞关键的next节点引用
	Entry<K,V> next;
	// entry的key计算出的hash值
	int hash;
	
	// 关键点:Entry只提供了一个构造方法,四个参数=全参数构造方法
	Entry(int h, K k, V v, Entry<K,V> n) {
		value = v;
		next = n;
		key = k;
		hash = h;
	}

	public final K getKey() {
		return key;
	}

	public final V getValue() {
		return value;
	}

	public final V setValue(V newValue) {
		V oldValue = value;
		value = newValue;
		return oldValue;
	}

	public final boolean equals(Object o) {
		if (!(o instanceof Map.Entry))
			return false;
		Map.Entry e = (Map.Entry)o;
		Object k1 = getKey();
		Object k2 = e.getKey();
		if (k1 == k2 || (k1 != null && k1.equals(k2))) {
			Object v1 = getValue();
			Object v2 = e.getValue();
			if (v1 == v2 || (v1 != null && v1.equals(v2)))
				return true;
		}
		return false;
	}

	public final int hashCode() {
		return Objects.hashCode(getKey()) ^ Objects.hashCode(getValue());
	}

	public final String toString() {
		return getKey() + "=" + getValue();
	}
	
	void recordAccess(HashMap<K,V> m) {
	}
	
	void recordRemoval(HashMap<K,V> m) {
	}
}
// 2.2.3 initHashSeedAsNeeded方法,处理hashSeed开关
final boolean initHashSeedAsNeeded(int capacity) {
	boolean currentAltHashing = hashSeed != 0;
	boolean useAltHashing = sun.misc.VM.isBooted() &&
			(capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
	boolean switching = currentAltHashing ^ useAltHashing;
	if (switching) {
		hashSeed = useAltHashing
			? sun.misc.Hashing.randomHashSeed(this)
			: 0;
	}
	return switching;
}

2.2 putForNullKey方法设置null作为键

private V putForNullKey(V value) {
	// null键,hash设置为0,放在table的第一个bucket位置上
	for (Entry<K,V> e = table[0]; e != null; e = e.next) {
		if (e.key == null) {
			V oldValue = e.value;
			e.value = value;
			e.recordAccess(this);
			return oldValue;
		}
	}
	modCount++;
	// hash为0,bucketIndex为0,放在table的第一个位置上
	addEntry(0, null, value, 0);
	return null;
}

2.3 hash方法,内部实现,待定研究为啥这样处理,TODO

final int hash(Object k) {
	int h = hashSeed;
	if (0 != h && k instanceof String) {
		return sun.misc.Hashing.stringHash32((String) k);
	}

	h ^= k.hashCode();

	// This function ensures that hashCodes that differ only by
	// constant multiples at each bit position have a bounded
	// number of collisions (approximately 8 at default load factor).
	h ^= (h >>> 20) ^ (h >>> 12);
	return h ^ (h >>> 7) ^ (h >>> 4);
}

2.4 indexFor方法,传入hash和table.length计算出key放的bucketIndex位置

static int indexFor(int h, int length) {
	// hashmap对比hashtable做了改进,hashtable采用除法取模方式,hashmap采用&处理,效率更高
	return h & (length-1);
}

2.5 addEntry方法,添加entry节点

void addEntry(int hash, K key, V value, int bucketIndex) {
	// 判断size若超过了threshold阈值,则进行扩容处理,调用resize方法
	if ((size >= threshold) && (null != table[bucketIndex])) {
		// 扩容方法,见下文分析
		resize(2 * table.length);
		hash = (null != key) ? hash(key) : 0;
		bucketIndex = indexFor(hash, table.length);
	}

	// 添加entry节点方法
	createEntry(hash, key, value, bucketIndex);
}
// resize扩容方法为:newCapacity为老的capacity的两倍大小
void resize(int newCapacity) {
	Entry[] oldTable = table;
	int oldCapacity = oldTable.length;
	// 已经超过了最大,扩容失败,直接返回
	if (oldCapacity == MAXIMUM_CAPACITY) {
		threshold = Integer.MAX_VALUE;
		return;
	}

	// 创建一个新的newcapacity大小的table数组
	Entry[] newTable = new Entry[newCapacity];
	// 将old table里边的数据搬迁到new table里边
	transfer(newTable, initHashSeedAsNeeded(newCapacity));
	
	// 将new table覆盖给table
	table = newTable;
	// 重新设定好threshold,newcapacity * loadfactor
	threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
// transfer方法为:
void transfer(Entry[] newTable, boolean rehash) {
	int newCapacity = newTable.length;
	for (Entry<K,V> e : table) {
		while(null != e) {
			Entry<K,V> next = e.next;
			if (rehash) {
				e.hash = null == e.key ? 0 : hash(e.key);
			}
			int i = indexFor(e.hash, newCapacity);
			e.next = newTable[i];
			newTable[i] = e;
			e = next;
		}
	}
}
// createEntry方法为:
void createEntry(int hash, K key, V value, int bucketIndex) {
	Entry<K,V> e = table[bucketIndex];
	table[bucketIndex] = new Entry<>(hash, key, value, e);
	size++;
}

继续,HashMap的get方法源码分析为:

public V get(Object key) {
	if (key == null)
		return getForNullKey();
	Entry<K,V> entry = getEntry(key);

	return null == entry ? null : entry.getValue();
}

若key为null,调用getForNullKey方法获取value对象

private V getForNullKey() {
	if (size == 0) {
		return null;
	}
	for (Entry<K,V> e = table[0]; e != null; e = e.next) {
		if (e.key == null)
			return e.value;
	}
	return null;
}

若key不为null,则调用getEntry方法,大致过程为:

1.调用hash(key)获取到hash值
2.调用indexFor方法,传入hash值和table数组的length长度,计算出bucketIndex
3.遍历table[bucketIndex]位置上的entry,判断条件为:(e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k))))

HashMap的getEntry方法对应源代码为:

final Entry<K,V> getEntry(Object key) {
	if (size == 0) {
		return null;
	}

	int hash = (key == null) ? 0 : hash(key);
	for (Entry<K,V> e = table[indexFor(hash, table.length)];
		 e != null;
		 e = e.next) {
		Object k;
		if (e.hash == hash &&
			((k = e.key) == key || (key != null && key.equals(k))))
			return e;
	}
	return null;
}

HashMap里边有一个很重要的方法,就是entrySet()方法
// entrySet是HashMap的成员变量
private transient Set> entrySet = null;
使用entrySet方法demo例子为:

// foreach迭代的时候,内部调用的是iterator迭代器处理的
	// 调用entrySet方法返回键值对数据集合
	Set<Map.Entry<KeyObject, ValueObject>> entrys = map.entrySet();
	for (Map.Entry<KeyObject, ValueObject> entry : entrys) {
		System.out.println("entry.key为:" + entry.getKey());
		System.out.println("entry.value为:" + entry.getValue());
	}
// entrySet方法内部调用的是entrySet0(),为何不合并成一个方法
public Set<Map.Entry<K,V>> entrySet() {
	return entrySet0();
}

// 首次调用entrySet()方法的时候entrySet为null,new一个新的entry set
private Set<Map.Entry<K,V>> entrySet0() {
	Set<Map.Entry<K,V>> es = entrySet;
	return es != null ? es : (entrySet = new EntrySet());
}

EntrySet内部处理源码为:

// 1.entryset包装成了一个类,里边最关键方法为iterator方法
private final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
	// entryset内部的迭代方法,iterator调用HashMap提供的newEntryIterator方法
	public Iterator<Map.Entry<K,V>> iterator() {
		// 见下文1.1 分析
		return newEntryIterator();
	}
	
	public boolean contains(Object o) {
		if (!(o instanceof Map.Entry))
			return false;
		Map.Entry<K,V> e = (Map.Entry<K,V>) o;
		Entry<K,V> candidate = getEntry(e.getKey());
		return candidate != null && candidate.equals(e);
	}
	
	public boolean remove(Object o) {
		return removeMapping(o) != null;
	}
	
	public int size() {
		return size;
	}
	
	public void clear() {
		HashMap.this.clear();
	}
}
// 1.1.newEntryIterator方法源码为:
Iterator<Map.Entry<K,V>> newEntryIterator()   {
	return new EntryIterator();// 见1.2分析构造出一个EntryIterator迭代器对象
}
// 1.2.EntryIterator迭代器类源码为:
private final class EntryIterator extends HashIterator<Map.Entry<K,V>> {
	// next方法里边调用nextEntry方法,内部使用的是父类的HashIterator迭代器方法
	public Map.Entry<K,V> next() {
		return nextEntry();// 见1.3分析,HashIterator迭代器是最为关键地方
	}
}
// 1.3.HashIterator迭代器类源码为:
private abstract class HashIterator<E> implements Iterator<E> {
	Entry<K,V> next;        // next entry to return
	int expectedModCount;   // For fast-fail
	int index;              // current slot
	Entry<K,V> current;     // current entry

	HashIterator() {
		// 构造hash iterator迭代器的时候先赋值expectedModCount为当前的modCount
		expectedModCount = modCount;
		if (size > 0) { // advance to first entry
			Entry[] t = table;
			// 首次,先定位到第一个不为null的entry节点index
			while (index < t.length && (next = t[index++]) == null)
				;
		}
	}

	// 判断entry是否有next下一个节点数据
	public final boolean hasNext() {
		return next != null;
	}

	final Entry<K,V> nextEntry() {
		// 在迭代的过程中,若遇到modCount修改了,则抛出ConcurrentModificationException异常
		if (modCount != expectedModCount)
			throw new ConcurrentModificationException();
		Entry<K,V> e = next;
		if (e == null)
			throw new NoSuchElementException();
		
		// 若为null,为了定位到下一个entry节点,此处采用while往下走方式
		if ((next = e.next) == null) {
			Entry[] t = table;
			while (index < t.length && (next = t[index++]) == null)
				;
		}
		current = e;
		return e;
	}

	public void remove() {
		if (current == null)
			throw new IllegalStateException();
		if (modCount != expectedModCount)
			throw new ConcurrentModificationException();
		Object k = current.key;
		current = null;
		HashMap.this.removeEntryForKey(k);
		expectedModCount = modCount;
	}
}

后续有时间在梳理下LinkedHashMap和TreeMap内部源码实现。

重定向和转发区别

负载均衡方式:

1 重定向:不容易扩展,一般不适用
2 转发:转发机制比较灵活,容易扩展,比如nginx,七层负载均衡,应用层做转发

重定向方式:

1 首次访问http://www.taobao.com
2 点击已经买到的宝贝https://buyertrade.taobao.com/trade/itemlist/list_bought_items.htm
3 重定向到https://login.taobao.com/member/login.jhtml?redirectURL=https://buyertrade.taobao.com/trade/itemlist/list_bought_items.htm

重定向特点:

1 http返回码为302
2 client端发起两次http请求
3 client端url输入栏变化了

转发特点:

1 访问url1
2 服务器内部做了转发url1到url2
3 client端用户url输入栏不会变化,一次http请求

OSI七层协议:

7 应用层	HTTP、FTP、NFS、SMTP、telnet
6 表示层	加密、ASCII等
5 会话层	RPC、SQL等
4 传输层	TCP、UDP、SPX
3 网络层	IP、IPX
2 数据链路层	ATM、FDDI
1 物理层	Rj45/802.3等

6-falcon如何做自我监控

http://book.open-falcon.com/zh/practice/monitor.html

我们把对监控系统的监控,称为监控系统的自监控。自监控的需求,没有超出监控的业务范畴。
同其他系统一样,自监控要做好两方面的工作: 故障报警和状态展示。故障报警,要求尽量实时的发现故障、及时的通知负责人,要求高可用性。
状态展示,多用于事前预测、事后追查,实时性、可用性要求 较故障报警 低一个量级。下面我们从这两个方面,分别进行介绍。

故障报警

故障报警相对简单。我们使用第三方监控组件AntEye,来监控Open-Falcon实例的健康状况。
Open-Falcon各个组件,都会提供一个描述自身服务可用性的自监控接口,描述如下。AntEye服务会定时巡检、主动调用Open-Falcon各实例的自监控接口,如果发现某个实例的接口没有如约返回”ok”,
就认为这个组件故障了(约定),就通过短信、邮件等方式 通知相应负责人员。
为了减少报警通知的频率,AntEye采用了简单的报警退避策略,并会酌情合并一些报警通知的内容。

AntEye地址为:https://github.com/niean/anteye

# API for my availability
接口URL
/health 检测本服务是否正常

请求方法
GET http://$host:$port/health
$host 服务所在机器的名称或IP
$port 服务的http.server监听端口

请求参数
无参数

返回结果(string)
“ok”(没有返回”ok”, 则服务不正常)

AntyEye组件主动拉取状态数据,通过本地配置加载监控实例、报警接收人信息、报警通道信息等,这样做,是为了简化报警链路、使故障的发现过程尽量实时&可靠。
AntEye组件足够轻量,代码少、功能简单,这样能够保障单个AntEye实例的可用性;同时,AntEye是无状态的,能够部署多套,这进一步保证了自监控服务的高可用。
在同一个重要的网络分区内,通常要部署3+个AntEye,如下图所示。我们一般不会让AntEye做跨网络分区的监控,因为这样会带来很多网络层面的误报。

多套部署,会造成报警通知的重复发送,这是高可用的代价;从我们的实践经验来看,这个重复可以接受。

值得注意的是,原来故障发现功能是Open-Falcon的Task组件的一个代码片段。
后来,为了满足多套部署的需求,我们把故障发现的逻辑从Task中剔除,转而使用独立的第三方监控组件AntEye。

对于自监控,简单整理下:
神医难自医。大型监控系统的故障监控,往往需要借助第三方监控系统(AntEye)。第三方监控系统,越简单、越独立,越好。
吃自己的狗粮。一个系统,充分暴露自身的状态数据,才更有利于维护。我们尽量,把状态数据的存储容器做成通用的、把获取状态数据的接口做成一致的、把状态数据的采集服务做成集中式的,
方便继承、方便运维。当前,程序获取自己状态数据的过程还不太优雅、入侵严重;如果您能指点一二,我们将不胜感激。

5-如何监控第三方redis和rabbitmq

如何监控mysql
https://github.com/open-falcon/mymon

mymon(MySQL-Monitor) — MySQL数据库运行状态数据采集脚本,采集包括global status, global variables, slave status等。

mkdir -p $GOPATH/src/github.com/open-falcon
cd $GOPATH/src/github.com/open-falcon
git clone https://github.com/open-falcon/mymon.git

cd mymon
go get ./...
go build -o mymon

echo '* * * * * cd $GOPATH/src/github.com/open-falcon/mymon && ./mymon -c etc/mon.cfg' > /etc/cron.d/mymon

此处使用的是crontab方式的定时任务处理

如何监控redis

http://book.open-falcon.com/zh/usage/redis.html

在数据采集一节中我们介绍了常见的监控数据源。open-falcon作为一个监控框架,可以去采集任何系统的监控指标数据,只要将监控数据组织为open-falcon规范的格式就OK了。
Redis的数据采集可以通过采集脚本redis-monitor来做。
redis-monitor是一个cron,每分钟跑一次采集脚本redis-monitor.py,其中配置了redis服务的地址,redis-monitor连到redis实例,采集一些监控指标,比如connected_clients、used_memory等等,然后组装为open-falcon规定的格式的数据,post给本机的falcon-agent。falcon-agent提供了一个http接口,使用方法可以参考数据采集中的例子。
比如,我们有1000台机器都部署了Redis实例,可以在这1000台机器上分别部署1000个cron,即:与Redis实例一一对应。

如何监控rabbitmq

http://book.open-falcon.com/zh/usage/rabbitmq.html

在数据采集一节中我们介绍了常见的监控数据源。open-falcon作为一个监控框架,可以去采集任何系统的监控指标数据,只要将监控数据组织为open-falcon规范的格式就OK了。

RMQ的数据采集可以通过脚本rabbitmq-monitor来做。

工作原理

rabbitmq-monitor是一个cron,每分钟跑一次脚本rabbitmq-monitor.py,其中配置了RMQ的用户名&密码等,脚本连到该RMQ实例,采集一些监控指标,比如messages_ready、messages_total、deliver_rate、publish_rate等等,然后组装为open-falcon规定的格式的数据,post给本机的falcon-agent。falcon-agent提供了一个http接口,使用方法可以参考数据采集中的例子。
比如我们部署了5个RMQ实例,可以在 每个RMQ实例机器上运行一个cron,即:与RMQ实例一一对应。

4-如何监控进程和端口

proc.num metric指标
tag支持两种方式,一种是name另外一种是cmdline

通过ps -ef|grep auth.nh.fdd 提取到pid
然后通过,cat /proc/{pid}/status查看如下数据为:

Name:	java
State:	S (sleeping)
Tgid:	41656
Pid:	41656
PPid:	1
TracerPid:	0
Uid:	506	506	506	506
Gid:	506	506	506	506
Utrace:	0
FDSize:	256
Groups:	506 
VmPeak:	 4153900 kB
VmSize:	 4088364 kB
VmLck:	       0 kB
VmHWM:	  441080 kB
VmRSS:	  434844 kB
VmData:	 3946684 kB
VmStk:	      88 kB
VmExe:	      36 kB
VmLib:	  112796 kB
VmPTE:	    1292 kB
VmSwap:	       0 kB
Threads:	49
SigQ:	0/256287
SigPnd:	0000000000000000
ShdPnd:	0000000000000000
SigBlk:	0000000000000000
SigIgn:	0000000000000002
SigCgt:	1000000181005ccd
CapInh:	0000000000000000
CapPrm:	0000000000000000
CapEff:	0000000000000000
CapBnd:	ffffffffffffffff
Cpus_allowed:	ffff,ffffffff
Cpus_allowed_list:	0-47
Mems_allowed:	00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000003
Mems_allowed_list:	0-1
voluntary_ctxt_switches:	1
nonvoluntary_ctxt_switches:	3

然后,通过/proc/{pid}/cmdline查看内容为:

/usr/java/default/jre/bin/java-Djava.util.logging.config.file=/data0/deploy/java/auth.nh.fdd/newhouse-authservice/conf/logging.properties-Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager-server-Xms256m-Xmx256m-XX:PermSize=128m-XX:MaxNewSize=128m-XX:MaxPermSize=128m-XX:+HeapDumpOnOutOfMemoryError-Denv=dev-DLogPath=/data0/log/auth.nh.fdd-XX:+UseParallelGC-XX:+UseParallelOldGC-Dtomcat.log=/tomcat/-Dlog_path=/app-Drunlog.dir=/app/runlog-Dbusilog.dir=/app/busilog-Dfile.encoding=UTF-8-Dtest_var=hello_world-Djava.endorsed.dirs=/data0/deploy/java/auth.nh.fdd/newhouse-authservice/endorsed-classpath/data0/deploy/java/auth.nh.fdd/newhouse-authservice/bin/bootstrap.jar:/data0/deploy/java/auth.nh.fdd/newhouse-authservice/bin/tomcat-juli.jar-Dcatalina.base=/data0/deploy/java/auth.nh.fdd/newhouse-authservice-Dcatalina.home=/data0/deploy/java/auth.nh.fdd/newhouse-authservice-Djava.io.tmpdir=/data0/deploy/java/auth.nh.fdd/newhouse-authservice/temporg.apache.catalina.startup.Bootstrapstart

cmdline中的内容是你的启动命令,这么说不准确,你会发现空格都没了。其实是把空格自动替换成\0了。不用关心,直接鼠标选中,拷贝之即可。
不要自以为是的手工加空格配置到策略中哈,监控策略的tag是不允许有空格的。
上面的例子,java -c uic.properties在cmdline中的内容会变成:java-cuic.properties,无需把整个cmdline都拷贝并配置到策略中。
虽然name这个tag是全匹配的,即用的==比较name,但是cmdline不是,我们只需要拷贝cmdline的一部分字符串,能够与其他进程区分开即可。比如上面的配置:

3-falcon默认采集指标

cpu指标:
cpu_num cpu总数(只收集一次)
cpu_speed 以MH作为单位的cpu速度(只收集一次)

memory指标:
proc_num 正在运行的进程总数
proc_total 进程总数
proc_thread_total 进程+线程总数

kernel指标:
boottime 系统最近启动时间
sys_clock 系统时钟时间
heartbeat 上一次心跳发送时间

处理命令为:

cpu指标:
cpu_num cpu总数(只收集一次)
cpu_speed 以MH作为单位的cpu速度(只收集一次)

memory指标:
proc_num 正在运行的进程总数 ps -r|wc -l
proc_total 进程总数 ps -aux|wc -l
proc_thread_total 进程+线程总数 ps -eLf|wc -l

kernel指标:
boottime 系统最近启动时间
sys_clock 系统时钟时间
heartbeat 上一次心跳发送时间

补充linux命令为:
ps -f –forest -C nginx tree树状结构,进程下边挂着线程
ps aux –sort=-pcpu|head -10 根据cpu排序,展示前10个进程
watch -n 1 ‘ps -e -o pid,uname,cmd,pmem,pcpu –sort=-pmem,-pcpu | head -15′

kernel指标:
kernel.files.allocated 已分配的文件句柄
kernel.files.left 剩余可分配的文件句柄
kernel.maxfiles 系统支持最大的openfiles
kernel.maxproc 系统支持最大的进程数

2-falcon使用介绍

open-falcon如何使用。

第一:如何查看监控数据

dashboard web项目,搜索监控数据的地方

左侧输入endpoint,endpoint都是机器名,去目标机器上执行hostname查看
勾选左侧的endpoint,然后右侧count搜索框输入,count=${metric}/sorted(${tags})

举例为count为:cpu.idle,在counter搜索框中输入cpu并回车,弹出的即为graph监控图形数据了

第二:如何配置报警策略

1.配置报警接收人

和传统的zabbix、nagios不同之处falcon配置的既不是手机号码也不是邮箱,因为:
每次因为人员手机号码或邮箱变化,就去修改各个机器上的配置是繁琐又不合理的地方。

falcon抽象出了uic的概念,只要在uic配置好对应的人员:手机号码、邮箱即可。
另外,抽象出了team组的概念,服务告警99%的场景都是要通知一个team,将人员划分到team里边即可。

2.创建HostGroup

举例为对falcon-judge这个组件做端口监听,首先需要创建一个HostGroup,将所有部署了falcon-judge这个模块的机器都加进去,
这样做的好处是,以后扩容或下线机器的时候直接从这个HostGroup增删机器即可,报警策略会自动生效、失效。

小米开发建议取名规范为:

sa.dev.falcon.judge 名称规范为:
sa 是我们部门
dev 是我们开发组
falcon 是项目名
judge 是组件名,推荐命名必须规范,否则业务多了无法管理了

结合我们业务命名为,举例poc规范为:
xf.dev.poc.avg xf:新房,dev:开发,poc:项目名,avg:接口平均调用时间ms
xf.dev.poc.* 待定统一规范化,参考zabbix现行命名规范

在往组里加机器的时候如果报错,需要检查portal的数据库中host表,看里边是否有相关机器。那host表中的机器从哪里来呢?

agent有个heartbeat(hbs)的配置,agent每分钟会发心跳给hbs,把自己的ip、hostname、agent version等信息告诉hbs,hbs负责写入host表。

如果host表中没数据,需要检查这条链路是否通畅。

3.创建策略模板

portal上方有个template连接,策略模板管理入口。
创建模板名称为:sa.dev.falcon.judge,姑且和HostGroup名称相同,设置一个端口监控。通常进程监控方法有两种:
一是:进程本身是否存活
另外:端口是否在监听,此处我们使用端口监控。

模板策略加上
metric设置为:net.port.listen
tags设置为:port=6080
max设置为:3
P设置为:0
note设置为:judege挂了
if设置为:all(#2) == 0

右上角那个加号按钮是用于增加策略的,一个模板中可以有多个策略,此处我们只添加了一个。

下面可以配置报警接收人,此处填写的是falcon,这是之前在UIC中创建的Team。

4.将HostGroup和模板绑定

一个模板是可以绑定到多个HostGroup的,现在我们重新回到HostGroups页面,找到sa.dev.falcon.judge这个HostGroup,
右侧有几个超链接,点击【templates】进入一个新页面,输入模板名称,绑定一下就行了。

上面的策略只是对falcon-judge做了端口监控,那如果我们要对falcon这个项目的所有机器加一些负载监控,应该如何做呢?

创建一个HostGroup:sa.dev.falcon,把所有falcon的机器都塞进去
创建一个模板:sa.dev.falcon.common,添加一些像cpu.idle,load.1min等策略
将sa.dev.falcon.common绑定到sa.dev.falcon这个HostGroup

大家可能不知道各个指标分别叫什么,自己push的数据肯定知道自己的metric了,
agent push的数据可以参考:https://github.com/open-falcon/agent/tree/master/funcs

5.如何配置策略表达式

策略表达式,即expression,具体可以参考HostGroup与Tags设计理念,这里只是举个例子

设置expression为:
each(metric=qps project=falcon module=judge)
if all(#3) > 1000
最多报警3次

falcon-judge这个模块的所有实例,如果qps连续3次大于1000,就报警给falcon这个报警组。
expression无需绑定到HostGroup,enjoy it

更多思考参考wiki为:

http://book.open-falcon.com/zh/philosophy/tags-and-hostgroup.html

1-falcon初认识

监控系统关键点

1 监控指标数据采集
2 数据绘图功能
3 告警通知功能

监控系统有哪些

1 zabbix	使用广泛,有告警机制
2 ganglia	无告警机制	集群大规模分布式	久经考验
3 nagios	告警	告警配置比较复杂,不灵活
4 open-falcon	模块化	集群大规模分布式	时间比较短

open-falcon监控一些指标数据:

监控数据/data0占用情况,此处为rrd文件占用情况
监控数据,一个文件约为100K,100台*200个监控项*100KByte=2G磁盘占用
以此类推,1000台机器约为20G磁盘占用。

java内存模型和垃圾回收

本文目录:

1.java内存模型
2.java垃圾回收器
3.常见命令

以下内容来自zzm深入java虚拟机教程:
1.java内存模型
java运行期内存模型

1.1.方法区
方法区=永久代+常量池
方法区是各个线程共享的内存区域,用于存放已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。
方法区别名叫做Non-Heap非堆,目的应该是和java堆区分开来。

永久代,Permannent Generation,对应JVM参数为:
-XX:MaxPermSize 永久代最大内存大小
-XX:PermSize 永久代初始分配的内存大小

方法区OutOfMemoryError演示demo例子为:

package com.oom.study;

import java.lang.reflect.Method;

import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

/**
 * 方法区:永久代+常量池
 * 
 * -XX:MaxPermSize=5m -XX:PermSize=5m
 * 
 * @author XiusongHU
 */
public class JavaMethodAreaExample {

    public static void main(String[] args) {
        // 通过cglib字节码技术,增强类越多,就越容易达到上限
        while (true) {
            // 定义并实例化enhancer对象
            Enhancer enhancer = new Enhancer();
            // 设置super class类对象
            enhancer.setSuperclass(HelloWorld.class);
            // 设置禁用缓存
            enhancer.setUseCache(false);
            // 设置callback回调方法
            enhancer.setCallback(new MethodInterceptor() {

                @Override
                public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
                    return proxy.invokeSuper(obj, args);
                }

            });
            // 通过create方法创建出java对象
            HelloWorld demo = (HelloWorld) enhancer.create();

            // 设置对象的成员变量
            demo.name = "hello world";
            // 调用对象的sayHello方法
            demo.sayHello();
        }
    }

    /**
     * 定义一个name成员变量以及定义一个sayHello方法
     * 
     * @author XiusongHU
     */
    public static class HelloWorld {
        /** 成员变量name,用户名称 */
        private String name;

        /**
         * 成员方法sayHello
         */
        private void sayHello() {
            System.out.println("I am " + name);
        }
    }

}

输出结果为:由于设定的PermSize=5m,第二次循环的时候就已经OutOfMemoryError

I am hello world
I am hello world
Exception in thread "Reference Handler" Exception in thread "main" java.lang.OutOfMemoryError: PermGen space
// 遇到问题1;final class无法被cglib做set superclass处理
Exception in thread "main" java.lang.IllegalArgumentException: Cannot subclass final class com.oom.study.JavaMethodAreaExample$HelloWorld
    at net.sf.cglib.proxy.Enhancer.generateClass(Enhancer.java:457)
    at net.sf.cglib.core.DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)
    at net.sf.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:231)
    at net.sf.cglib.proxy.Enhancer.createHelper(Enhancer.java:378)
    at net.sf.cglib.proxy.Enhancer.create(Enhancer.java:286)
    at com.oom.study.JavaMethodAreaExample.main(JavaMethodAreaExample.java:37)

常量池:运行期常量池(Runtime Constant Pool)是方法区的一部分。
class文件中除了有类的版本、字段、方法、接口等描述信息后,还有一项信息是常量池,
用于存放编译器生成的各种字面量和符号引用,这部分内容酱紫啊类加载后进入方法区的运行时常量池中存放。

关于常量池还有一点补充:
运行时常量池相对于class文件常量池的另外一个重要特性是具备动态性,java语言并不要求常量一定能只有编译器才能产生,
也就是并非预置入class文件中常量池的内容才能进入方法区运行时常量池,运行期间也可能将新的常量放入池中,比如String类的intern()方法。

常量池OutOfMemoryError演示demo例子为:

package com.oom.study;

import java.util.ArrayList;
import java.util.List;

/**
 * 方法区:永久代+常量池(非堆Non-Heap)
 * 
 * 方法区:类描述信息、方法、字段描述信息
 * 常量池:编译期生成的字面量和符号引用,运行期也能加到常量池中,比如String.intern()方法
 * 
 * -XX:PermSize=10M -XX:MaxPermSize=10M
 * 
 * @author XiusongHU
 */
public class ConstantsOutOfMemoryExample {

    public static void main(String[] args) {
        List<String> list = new ArrayList<String>();
        // while循环,模拟出现常量池内存溢出
        int i = 0;
        while (true) {
            // 生成string字符串使用intern放到常量池中
            list.add(String.valueOf(i++).intern());
        }
    }

}

1.2.java虚拟机栈
java虚拟机栈是线程私有的,它的生命周期与线程相同,虚拟机栈描述的是Java方法执行的内存模型,
每个方法在执行的同时,都会创建一个栈帧stack frame,用于存储局部变量表、操作数栈、动态链接、方法出口等信息。
每一个方法从调用直至执行完成的过程,就对应着一个栈帧在虚拟机栈中入栈到出栈的过程。

局部变量表存放了编译期可知的各种基本数据类型(boolean、byte、char、short、int、long、float、double)、对象引用等。

package com.oom.study;

/**
 * 基本数据类型demo例子
 * 
 * @author XiusongHU
 */
public class BasicDataExample {

    public static void main(String[] args) {
        // boolean类型,true或false
        boolean b = false;
        // byte字节,1byte=8bit
        byte b1 = 1;
        // char字符,一个字符
        char c1 = 'c';
        // short短整型,1short=2byte
        short s1 = 1;
        // int整型,1int=4byte
        int i1 = 1;
        // long长整型,1long=8byte
        long l1 = 1L;
        // float单精度类型,1float=4byte
        float f1 = 1.0f;
        // double双精度类型,1d=8byte
        double d1 = 1.0d;
    }

}

当线程请求的栈深度超过了虚拟机所支持的最大栈深度,将抛出StackOverFlowError异常

package com.oom.study;

/**
 * -Xss128k 栈大小设置,每个线程分配栈内存不能超过128k
 * 
 * @author XiusongHU
 */
public class StackOverFlowExample {

    public static void main(String[] args) {
        // 定义并实例化HelloWorld对象
        HelloWorld obj = new HelloWorld();
        // 调用counter方法,counter方法内部做嵌套递归调用
        obj.counter();
    }

    public static class HelloWorld {
        private int i;

        // 线程请求的栈深度超过虚拟机所支持的最大深度,将会抛出StackOverFlowError异常
        public void counter() {
            i++;
            counter();
        }

        public int getI() {
            return i;
        }

        public void setI(int i) {
            this.i = i;
        }
    }

}

虚拟机栈若线程创建过多,占用的内存又没有释放掉,无法申请足够多的内存空间,将抛出OutOfMemoryError异常

package com.oom.study;

/**
 * -Xss2M 分配给每个线程栈足够大,2M,不断创建线程,没有释放内存,模拟出现OutOfMemoryError
 * 
 * @author XiusongHU
 */
public class StackOutOfMemoryExample {

    private void dontStop() {
        while (true) {
        }
    }

    private void stackLeakByThread() {
        while (true) {
            // 定义并实例化线程thread对象
            Thread thread = new Thread(new Runnable() {

                @Override
                public void run() {
                    // 不断创建出线程,然后每个线程执行run方法,调用dontStop方法做while死循环处理
                    dontStop();
                }

            });
            // 启动线程
            thread.start();
        }
    }

    public static void main(String[] args) {
        StackOutOfMemoryExample example = new StackOutOfMemoryExample();
        example.stackLeakByThread();
    }
}

1.3.本地方法栈
本地方法栈native method stack与虚拟机栈所发挥的作用是非常相似的,他们之间的区别不过是虚拟机栈执行java方法,也就是字节码服务,
而本地方法栈则为虚拟机使用到的native方法服务。

本地方法栈也会抛出StackOverFlowError和OutOfMemoryError异常。

1.4.java堆内存
java堆,java heap是java虚拟机所管理的内存中最大的一块,java堆是被所有线程共享的一块内存区域,在虚拟机启动的时候创建。
java堆内存区域唯一的目的就是存放对象的实例,几乎所有的对象实例都在这边分配内存。

java堆细分为:新生代和老年代,其中新生代又划分为:Eden、From Survivor空间和To Survivor空间等。
若堆中没有内存完成实例分配,并且堆也无法再扩展时,将会抛出OutOfMemoryError异常。

演示堆内存无法继续完成实例分配抛出异常demo

package com.oom.study;

import java.util.ArrayList;
import java.util.List;

/**
 * -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGC -XX:+PrintGCDetail -XX:+PrintGCTimeStamps
 * 
 * GC参数配置说明:
 * -XX:+PrintGC 打印出GC日志
 * -Xloggc:filename GC日志重定向某个filename文件
 * -XX:+PrintGCDetail 打印出GC日志详情
 * -XX:+PrintGCTimeStamps 打印出GC日志时间戳
 * -Xms 最小堆参数
 * -Xmx 最大堆参数
 * -XX:+HeapDumpOnOutOfMemoryError heap溢出后生成堆内存镜像文件
 * 
 * jmap dump:format=filename.bin <pid>
 * 
 * @author XiusongHU
 */
public class HeapOutOfMemoryExample {

    public static void main(String[] args) {
        List<HelloWorld> list = new ArrayList<HelloWorld>();
        
        // 循环创建对象,模拟出现堆内存溢出场景
        while(true){
            // new Object()对象放在堆上
            list.add(new HelloWorld());
        }
    }

    static class HelloWorld {
        private String name;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

    }

}

1.5.程序计数器
程序计数器program counter register是一块较小的内存空间,他可以看做是当前线程所执行的字节码的行号指示器。
字节码解释权工作时就是通过改变这个计数器的值来选取下一条需要执行的字节码指令,分支、循环、跳转、异常处理、线程恢复等基础功能需要依赖这个程序计数器来完成。
程序计数器是唯一一个在java虚拟机规范中没有规定任何OutOfMemoryError情况的区域。

2.java垃圾回收器
2.1.垃圾回收算法
*标记-清除 Mark-Sweep
*标记-整理 Mark-Compact
*复制算法 Copying

标记-清除缺点为:
1.效率问题:标记和清除两个阶段效率都不高。
2.标记清除后,会产生大量不连续的内存碎片。

复制算法copying缺点为:
1.将内存压缩原来的一半,总是有一部分的内存是闲置的。
2.在对象存活率比较高的情况下,需要进行较多的复制,效率将会变低。

标记-整理算法在标记阶段和标记-清除算法一致,但是后续步骤不是直接对可回收对象进行清理,而是让所有存活的对象往一端移动,然后直接清理掉端边界以外的内存。
因此,标记-整理缺点为:
1.标记阶段效率不高,和标记-清除算法一致。
2.若存活对象较多,移动到另外一端带来不少开销。

2.2.java垃圾回收器
分代垃圾回收器

新生代:采用复制copying算法进行垃圾回收

SerialGC 单线程串行垃圾回收器,stop all the world,用户线程和GC线程串行。-XX:useSerialGC
ParaNewGC 在SerialGC基础上,多线程版本实现,回收过程同样需要停顿用户线程,在单CPU情况下,ParaNewGC回收效率不如SerialGC。-XX:useParaNewGC
Parallel Scavenge 并行多线程版实现,考虑的是“吞吐量优先”原则,适合后台类定时调度进程服务,不适用前台类,用户敏感停顿的进程服务。-XX:useParallelGC

老年代:标记-清除/整理算法进行垃圾回收

Serial Old 单线程版本,stop the world,GC的时候用户线程必须停顿下来,-XX:UseSerialGC
Parallel Old 多线程并行回收,-XX:UseParallelOldGC
CMS Concurrent Mark Sweep 并发多线程标记清除回收器,-XX:ConcurrentMarkSweepGC
CMS回收器的缺点:
1.对CPU敏感,需要多核支持,并发效率才高
2.无法处理浮动的垃圾,因为使用的是标记-清除算法,CMS提供了扩展标记-整理
3.收集结束后会产品内存碎片

3.常见命令

jps -l
jstat -gcutil 1000 10 pid
jinfo pid
jmap dump:format=filename.bin pid
jstack pid

另外,推荐整理文章《理解Java虚拟机体系结构》

rabbitmq讲稿梳理

教程提纲
1.MQ是什么
2.为什么需要MQ
3.JMS VS AMQP
4.RabbitMQ是什么
5.RabbitMQ核心概念
6.RabbitMQ通信过程
7.RabbitMQ使用场景

教程内容
1.MQ是什么
MQ全称:Message Queue消息队列

开源MQ产品:
RabbitMQ http://www.rabbitmq.com/
ActiveMQ http://activemq.apache.org/
ZeroMQ http://zeromq.org/
RocketMQ https://github.com/alibaba/RocketMQ

2.为什么需要MQ
先思考这两个问题:
1.没有MQ解决方案?
2.MQ带来了什么?

可能的答案:
1.系统解耦
2.扩展性
3.灵活性和峰值处理能力
4.可恢复性
5.排序保证
6.异步通信

接着,思考下边6个问题:
1.发送方和接收方如何维持链接,一方断开后数据丢失如何弥补?
2.如何降低发送方和接收方的耦合?解耦?
3.如何让priority优先级高的先接收到消息?
4.如何做load balance,有效均衡接收方负载?
5.如何将有效的数据发送给相关的接收方?接收方订阅subscribe不同的数据?
6.如何保证接收方接收到完整、正确的数据?

AMQP协议规范,解决了上述全部问题。

3.JMS VS AMQP
JMS是什么
JMS(JAVA消息服务)是一组标准的API,能够用于访问多种消息服务器。

JMS主要包括四个对象Object,分别为:
1.受控对象Administered Object
2.JMS提供者
3.JMS客户机
4.Message消息

传统消息传送模式一般分为两种:
1.点对点通信,Point to Point
2.发布和订阅,Pub and Sub

JMS API是在javax.jms包中定义的,一般过程为:
1.创建一个提供连接对象的连接工厂。
2.连接对象提供与消息服务器的链接。
3.链接被用来创建会话。
4.会话被用来创建消息。
5.消息通过消息的生产者发送到目标(队列或主题)。
6.消息传递到消费者。

消息是消息服务器在客户端之间发送的一条条消息,有五种接口,不同类型消息。
1.StreamMessage Java原始值的数据流
2.MapMessage 一组名/值对
3.TextMessage 一个字符串对象
4.ObjectMessage 一个序列化的Java对象
5.BytesMessage 一个未解释字节的数据流

消息由以下几部分组成:
1.header 消息头:JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
2.property 属性:由消息发送者产生,用来添加删除消息头以外的附加信息。
3.body 消息体:由消息发送者产生。

一个JMS应用是几个JMS客户端交换消息,开发JMS客户端应用由以下几步构成:
1.用JNDI提取到ConnectionFactory对象。
2.用ConnectionFactory创建Connection对象。
3.用Connection对象创建一个或多个JMS Session对象。
4.用户JNDI得到目标队列或主题对象,即Destination对象。
5.用Session和Destination创建MessageProducer和MessageConsumer。
6.通知Connection开始传递消息。

AMQP是什么
AMQP全称:Advanced Message Queuing Protocol提供统一消息服务的应用层标准高级消息队列协议。

AMQP三要素:
1.exchange 交换器
2.binding key 绑定规则
3.queue 队列

类比:邮件服务器
1.exchange VS 邮件代理服务
2.binding key VS 每一个传输代理中的邮箱路由表
3.queue VS email邮箱
4.message VS email邮件

AMQP概念介绍
1.连接(Connection):一个网络连接,比如TCP/IP套接字连接。
2.会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
3.信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
4.客户端(Client):AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
5.服务器(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
6.端点(Peer):AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
7.消息头(Header):描述消息数据属性的一种特殊段。
8.消息体(Body):包含应用程序数据的一种特殊段。消息体段对于服务器来说完全不透明——服务器不能查看或者修改消息体。
9.消息内容(Content):包含在消息体段中的的消息数据。
10.交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
11.交换器类型(Exchange Type):基于不同路由语义的交换器类。
12.消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
13.绑定器(Binding):消息队列和交换器之间的关联。
14.绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
15.路由关键字(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
16.持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
17.临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
18.持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
19.非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
20.消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
21.生产者(Producer):一个向交换器发布消息的客户端应用程序。
22.虚拟主机(Virtual Host):一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。

4.RabbitMQ是什么
AMQP是提供统一消息服务的应用层标准高级消息队列协议。
RabbitMQ是消息队列产品,实现了AMQP协议规范。

5.RabbitMQ核心概念
务必理解清楚:
1.vhosts 域,权限控制
2.exchange 交换器
3.routing key 路由规则键
4.binding key 绑定规则键
5.queue 队列

6.RabbitMQ通信过程
发送方:
1.在RabbitMQ中声明Connection
2.在RabbitMQ中声明Channel
3.生产者获取Connection
4.生产者获取Channel
5.定义Exchange、Queue
6.使用一个RoutingKey将Queue Binding到一个Exchange上
7.通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上
接收方:
1.获取connection
2.获取channel
3.从指定的queue上获取消息
接收方对exchange、routing key以及如何binding是不关心的

7.RabbitMQ使用场景
使用场景:交换器的类型,分为三种:
1.direct 直接方式
2.fanout 广播方式
3.topic 主题方式

python http调用例子

有段时间没有操作python了,测试同事帮忙调试python测试脚本
重新整理了两个python http demo例子,一个是get请求另外一个是post请求。

发送get请求demo例子:

#!/usr/bin/python
# -*- coding: utf-8 -*-
# hello_http_get.py

import httplib

#请求头
headers = {
	"cityId":"1337",
	"userId":"1234",
	"token":"abcd",
	"device-id":"778E5992-6DD7-4AB9-B2CC-741F31134D6C",
	"caller-id":"python-test",
	"request-id":"2298760f18a846969725a7752331e3b8",
	"platform":"5",
	"platformVersion":"21",
	"apiVersion":"21",
	"Content-type":"application/json"
}
#请求参数
params=''
#请求资源
resource_uri='/helloworld'
#请求方法
method_type='GET'

httpClient=None

try:
	print headers
	print params
	httpClient=httplib.HTTPConnection('localhost',9020,timeout=30)
	httpClient.request(method_type,resource_uri,params,headers)
	
	#获取返回数据对象
	response=httpClient.getresponse()
	print response.status #HTTP返回状态
	print response.reason #HTTP返回状态描述
	print response.read() #输出返回数据
except Exception,e:
	print e
finally:
	if httpClient:
		httpClient.close()

发送post请求demo例子:

#!/usr/bin/python
# -*- coding: utf-8 -*-
# hello_http_post.py

import httplib
import urllib
import json

#请求头
headers = {
	"cityId":"1337",
	"userId":"1234",
	"token":"abcd",
	"device-id":"778E5992-6DD7-4AB9-B2CC-741F31134D6C",
	"caller-id":"python-test",
	"request-id":"2298760f18a846969725a7752331e3b8",
	"platform":"5",
	"platformVersion":"21",
	"apiVersion":"21",
	"Content-type":"application/json"
}
#请求参数
params={
    "houseId" : "24230",
    "houseTypeId" : "0",
    "userPhone" : "13866554433",
    "userName" : "张三",
    "houseName" : "helloworld测试楼盘",
}
#服务端约定为json格式
params=json.dumps(params)
#服务端约定为body:k1=v1&k2=v2格式
#params=urllib.urlencode(params)
#请求资源
resource_uri='/helloworld'
#请求方法
method_type='POST'

httpClient=None

try:
	print headers
	print params
	httpClient=httplib.HTTPConnection('localhost',9020,timeout=30)
	httpClient.request(method_type,resource_uri,params,headers)
	
	#获取返回数据对象
	response=httpClient.getresponse()
	print response.status #HTTP返回状态
	print response.reason #HTTP返回状态描述
	print response.read() #输出返回数据
except Exception,e:
	print e
finally:
	if httpClient:
		httpClient.close()

第二篇:初探logstash

本文目录为:
1.logstash介绍
2.logstash安装
3.logstash入门使用
4.logstash参考文档

具体内容为:
1.logstash介绍

初探logstash

logstash是一个实时、开源数据采集项目。
logstash主要包括三个部分:输入、过滤以及输出插件组成。
logstash数据源包括但不局限于:web网页、数据库、路由器、系统日志等等。
logstash涉及功能包括但不局限于:Analysis数据分析、Archiving数据归档、Monitoring监控、Alerting告警等等。

logstash数据源
Logs and Metrics

1.apache web logs
2.application logs like log4j for java
3.syslog windows event logs networking and firewall logs and more
4.metrics from ganglia JMX and many other infrastructure and application platforms over TCP and UDP

2.logstash安装

第一步:安装好java环境

java -version
java version "1.7.0_55"
Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)

第二步:下载解压安装logstash环境
logstash下载官方地址为:
https://www.elastic.co/downloads/logstash

cd /usr/local
wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
tar -xzvf logstash-2.0.0.tar.gz
ln -s logstash-2.0.0 logstash

第三步:验证logstash服务

启动logstash进程,input和output设置为std控制台流

cd /usr/local/logstash
bin/logstash -e 'input { stdin { } } output { stdout {} }'

启动成功后,控制台输出为:

Default settings used: Filter workers: 8
Logstash startup completed

尝试输入hello world,输出过程为:

hello world # 这是stdin输入处理
2015-11-27T08:21:56.045Z 0.0.0.0 hello world # 这是stdout输出处理

到此,logstash安装和验证完毕,通过ctrl+D进行退出处理。

3.logstash入门使用

logstash核心主要分为三部分,这三部分都是以plugin插件方式结合工作的。

Input plugin 输入plugin插件
Filter plugin 过滤器plugin插件
Output plugin 输出plugin插件

数据流程pipeline过程为:

Data Source ==> Input plugin + Filter plugin + Output plugin ==> Data Destination
Data Source:数据源
Input plugin:输入相关plugin插件
Filter plugin:过滤器相关plugin插件
Output plugin:输出相关plugin插件
Data Destination:数据目标

第一步:设置好logstash配置文件
文件名为:first-pipeline.conf

input{
	file{
		path => "/data0/logs/nginx/access.log"
		start_position => beginning
	}
}

filter{
	grok{
		match => {"message" => "%{COMBINEDAPACHELOG}"}
	}
	geoip{
		source => "clientip"
	}
}

output{
	elasticsearch{}
	stdout{}
}

第二步:verify验证参数配置命令

cd /usr/local/logstash
bin/logstash -f conf/first-pipeline.conf --configtest

若成功将会提示为:Configuration OK

第三步:启动logstash实例服务

cd /usr/local/logstash
bin/logstash -f conf/first-pipeline.conf

遇到失败,错误日志为:

Attempted to send a bulk request to Elasticsearch configured at '["http://localhost:9200/"]', but Elasticsearch appears to be unreachable or down! 
{:client_config=>{:hosts=>["http://localhost:9200/"], :ssl=>nil, :transport_options=>{:socket_timeout=>0, :request_timeout=>0, :proxy=>nil, :ssl=>{}}, 
:transport_class=>Elasticsearch::Transport::Transport::HTTP::Manticore, :logger=>nil, :tracer=>nil, :reload_connections=>false, :retry_on_failure=>false, 
:reload_on_failure=>false, :randomize_hosts=>false}, :error_message=>"Connection refused", :level=>:error}

问题原因为:本机没有安装好elasticsearch服务,安装过程为:

cd /usr/local
wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.0.0/elasticsearch-2.0.0.tar.gz
tar -xzvf elasticsearch-2.0.0.tar.gz
ln -s elasticsearch-2.0.0 elasticsearch
bin/elasticsearch -d # 加-d后台启动

不能使用root账号启动elasticsearch服务,此处使用java账号进行启动。

su java
cd /usr/local
bin/elasticsearch -d # 加-d后台启动
ps -ef|grep elasticsearch # 查看java进程信息
java      3196     1 45 17:02 pts/1    00:00:04 /usr/local/java/default/bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true -Des.path.home=/usr/local/elasticsearch -cp /usr/local/elasticsearch/lib/elasticsearch-2.0.0.jar:/usr/local/elasticsearch/lib/* org.elasticsearch.bootstrap.Elasticsearch start -d

启动完毕后,验证elasticsearch服务为:

curl -X GET http://localhost:9200/
# 输出结果为:
{
  "name" : "John Ryker",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "2.0.0",
    "build_hash" : "de54438d6af8f9340d50c5c786151783ce7d6be5",
    "build_timestamp" : "2015-10-22T08:09:48Z",
    "build_snapshot" : false,
    "lucene_version" : "5.2.1"
  },
  "tagline" : "You Know, for Search"
}

处理elasticsearch完毕之后,回到logstash验证elasticsearch功能为:

# Replace $DATE with the current date, in YYYY.MM.DD format
# curl -X GET 'localhost:9200/logstash-$DATE/_search?q=response=200'
# 查询2015.11.27日索引数据,response HTTP返回码为200请求记录
curl -X GET 'localhost:9200/logstash-2015.11.27/_search?q=response=200'

输出结果为:

{
	"took" : 2,
	"timed_out" : false,
	"_shards" : {
		"total" : 5,
		"successful" : 5,
		"failed" : 0
	},
	"hits" : {
		"total" : 2,
		"max_score" : 0.641015,
		"hits" : [{
				"_index" : "logstash-2015.11.27",
				"_type" : "logs",
				"_id" : "AVFIMbva-vw3-VYBSPSQ",
				"_score" : 0.641015,
				"_source" : {
					"message" : "10.0.41.80 - - [27/Nov/2015:14:29:30 +0800] \"GET /resource/list/3.json HTTP/1.1\" 200 1392 \"http://hello.com/html/PageManager.html?sys_id=3\" \"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36\"",
					"@version" : "1",
					"@timestamp" : "2015-11-27T09:06:35.577Z",
					"host" : "0.0.0.0",
					"path" : "/data0/logs/nginx/access.log",
					"clientip" : "10.0.41.80",
					"ident" : "-",
					"auth" : "-",
					"timestamp" : "27/Nov/2015:14:29:30 +0800",
					"verb" : "GET",
					"request" : "/resource/list/3.json",
					"httpversion" : "1.1",
					"response" : "200",
					"bytes" : "1392",
					"referrer" : "\"http://hello.com/html/PageManager.html?sys_id=3\"",
					"agent" : "\"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36\""
				}
			}, {
				"_index" : "logstash-2015.11.27",
				"_type" : "logs",
				"_id" : "AVFIMbva-vw3-VYBSPSR",
				"_score" : 0.641015,
				"_source" : {
					"message" : "10.0.41.80 - - [27/Nov/2015:14:29:30 +0800] \"GET /favicon.ico HTTP/1.1\" 200 4286 \"http://hello.com/html/PageManager.html?sys_id=3\" \"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36\"",
					"@version" : "1",
					"@timestamp" : "2015-11-27T09:06:35.577Z",
					"host" : "0.0.0.0",
					"path" : "/data0/logs/nginx/access.log",
					"clientip" : "10.0.41.80",
					"ident" : "-",
					"auth" : "-",
					"timestamp" : "27/Nov/2015:14:29:30 +0800",
					"verb" : "GET",
					"request" : "/favicon.ico",
					"httpversion" : "1.1",
					"response" : "200",
					"bytes" : "4286",
					"referrer" : "\"http://hello.com/html/PageManager.html?sys_id=3\"",
					"agent" : "\"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36\""
				}
			}
		]
	}
}

4.logstash参考文档
参考doc文档地址为:https://www.elastic.co/guide

第一篇:初探ELK

最近团队在试用ELK,团队同事整理的关键点为:

1.日志分析和监控常见需求
2.团队在日志方面遇到的问题
3.常见一些开源trace系统
4.ELK角色功能分工

日志分析和监控常见需求为:

1.根据关键字查询日志详情
2.监控系统的运行情况,程序异常及时自动触发告警
3.统计分析,比如接口的调用次数、执行时间、成功率等等
4.基于日志的数据挖掘
5.安全、审计方面的需求

很多团队在日志方面遇到的常见问题为:

1.日志数据分散在多个服务器上,难以查找
2.开发人员需通过跳板机,然后登录线上服务器查看日志,大费周章
3.日志数据量大,通过一些常见的shell命令查询效率依旧不高
4.一个调用涉及多个系统的日志,没有调用链跟踪,问题难以定位

常见的一些重量级的开源Trace系统为:

1.Facebook scribe
2.cloudera flume
3.twitter zipkin
4.storm

对于日志来说,最常见的需求就是收集、查询、展示,对应解决方案为:

1.收集:logstash
2.查询:elasticsearch
3.展示:kibana

这是第一篇入门整理文档,后边将单独整理logstash、elasticsearch、kibana介绍、安装和使用。