BloomFilter + Redis 大数据去重策略的实现
1,179 views
1
米扑博客在上一篇已经介绍了《Bloom Filter 算法处理海量数据》,本文将直接介绍应用实践。
可以使用JDK自带的BitSet来实现,但存在两个问题:OOM、持久化问题。
结合Redis的BitMap能够解决,唯一需要注意的是Redis的BitMap只支持2^32大小,对应到内存也就是512MB,数组的下标最大只能是2^32-1。
不过这个限制可以通过构建多个Redis的Bitmap通过hash取模的方式分散一下即可。万分之一的误判率,512MB可以放下2亿条数据。
好了,扯了这么多,贴代码!
@Component public class BloomFilter<E> { @Autowired private RedisTemplate<String, E> redisTemplate; @Value("${bloomfilter.expireDays}") private long expireDays; // total length of the Bloom filter private int sizeOfBloomFilter; // expected (maximum) number of elements to be added private int expectedNumberOfFilterElements; // number of hash functions private int numberOfHashFunctions; // encoding used for storing hash values as strings private final Charset charset = Charset.forName("UTF-8"); // MD5 gives good enough accuracy in most circumstances. Change to SHA1 if it's needed private static final String hashName = "MD5"; private static final MessageDigest digestFunction; // The digest method is reused between instances static { MessageDigest tmp; try { tmp = java.security.MessageDigest.getInstance(hashName); } catch (NoSuchAlgorithmException e) { tmp = null; } digestFunction = tmp; } public BloomFilter() { this(0.0001, 600000); } /** * Constructs an empty Bloom filter. * * @param m is the total length of the Bloom filter. * @param n is the expected number of elements the filter will contain. * @param k is the number of hash functions used. */ public BloomFilter(int m, int n, int k) { this.sizeOfBloomFilter = m; this.expectedNumberOfFilterElements = n; this.numberOfHashFunctions = k; } /** * Constructs an empty Bloom filter with a given false positive probability. * The size of bloom filter and the number of hash functions is estimated * to match the false positive probability. * * @param falsePositiveProbability is the desired false positive probability. * @param expectedNumberOfElements is the expected number of elements in the Bloom filter. */ public BloomFilter(double falsePositiveProbability, int expectedNumberOfElements) { this((int) Math.ceil((int) Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) * expectedNumberOfElements / Math.log(2)), // m = ceil(kn/ln2) expectedNumberOfElements, (int) Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2)))); // k = ceil(-ln(f)/ln2) } /** * Adds an object to the Bloom filter. The output from the object's * toString() method is used as input to the hash functions. * * @param element is an element to register in the Bloom filter. */ public void add(E element) { add(element.toString().getBytes(charset)); } /** * Adds an array of bytes to the Bloom filter. * * @param bytes array of bytes to add to the Bloom filter. */ public void add(byte[] bytes) { if (redisTemplate.opsForValue().get(RedisConsts.CRAWLER_BLOOMFILTER) == null) { redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER, 0, false); redisTemplate.expire(RedisConsts.CRAWLER_BLOOMFILTER, expireDays, TimeUnit.DAYS); } int[] hashes = createHashes(bytes, numberOfHashFunctions); for (int hash : hashes) { redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER, Math.abs(hash % sizeOfBloomFilter), true); } } /** * Adds all elements from a Collection to the Bloom filter. * * @param c Collection of elements. */ public void addAll(Collection<? extends E> c) { for (E element : c) { add(element); } } /** * Returns true if the element could have been inserted into the Bloom filter. * Use getFalsePositiveProbability() to calculate the probability of this * being correct. * * @param element element to check. * @return true if the element could have been inserted into the Bloom filter. */ public boolean contains(E element) { return contains(element.toString().getBytes(charset)); } /** * Returns true if the array of bytes could have been inserted into the Bloom filter. * Use getFalsePositiveProbability() to calculate the probability of this * being correct. * * @param bytes array of bytes to check. * @return true if the array could have been inserted into the Bloom filter. */ public boolean contains(byte[] bytes) { int[] hashes = createHashes(bytes, numberOfHashFunctions); for (int hash : hashes) { if (!redisTemplate.opsForValue().getBit(RedisConsts.CRAWLER_BLOOMFILTER, Math.abs(hash % sizeOfBloomFilter))) { return false; } } return true; } /** * Returns true if all the elements of a Collection could have been inserted * into the Bloom filter. Use getFalsePositiveProbability() to calculate the * probability of this being correct. * * @param c elements to check. * @return true if all the elements in c could have been inserted into the Bloom filter. */ public boolean containsAll(Collection<? extends E> c) { for (E element : c) { if (!contains(element)) { return false; } } return true; } /** * Generates digests based on the contents of an array of bytes and splits the result into 4-byte int's and store them in an array. The * digest function is called until the required number of int's are produced. For each call to digest a salt * is prepended to the data. The salt is increased by 1 for each call. * * @param data specifies input data. * @param hashes number of hashes/int's to produce. * @return array of int-sized hashes */ public static int[] createHashes(byte[] data, int hashes) { int[] result = new int[hashes]; int k = 0; byte salt = 0; while (k < hashes) { byte[] digest; synchronized (digestFunction) { digestFunction.update(salt); salt++; digest = digestFunction.digest(data); } for (int i = 0; i < digest.length / 4 && k < hashes; i++) { int h = 0; for (int j = (i * 4); j < (i * 4) + 4; j++) { h <<= 8; h |= ((int) digest[j]) & 0xFF; } result[k] = h; k++; } } return result; } public int getSizeOfBloomFilter() { return this.sizeOfBloomFilter; } public int getExpectedNumberOfElements() { return this.expectedNumberOfFilterElements; } public int getNumberOfHashFunctions() { return this.numberOfHashFunctions; } /** * Compares the contents of two instances to see if they are equal. * * @param obj is the object to compare to. * @return True if the contents of the objects are equal. */ @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final BloomFilter<E> other = (BloomFilter<E>) obj; if (this.sizeOfBloomFilter != other.sizeOfBloomFilter) { return false; } if (this.expectedNumberOfFilterElements != other.expectedNumberOfFilterElements) { return false; } if (this.numberOfHashFunctions != other.numberOfHashFunctions) { return false; } return true; } /** * Calculates a hash code for this class. * * @return hash code representing the contents of an instance of this class. */ @Override public int hashCode() { int hash = 7; hash = 61 * hash + this.sizeOfBloomFilter; hash = 61 * hash + this.expectedNumberOfFilterElements; hash = 61 * hash + this.numberOfHashFunctions; return hash; } public static void main(String[] args) { BloomFilter<String> bloomFilter = new BloomFilter<>(0.0001, 600000); System.out.println(bloomFilter.getSizeOfBloomFilter()); System.out.println(bloomFilter.getNumberOfHashFunctions()); } }
注:MagnusS/Java-BloomFilter 的基础上加上了Redis持久化的实现
参考推荐:
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2021-02-28 22:48:26
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
打卡,不错