########################################################################### # Author: Eric A. Antonelo # Department of Automation and Systems # Federal University of Santa Catarina # Demonstration of Expectation-Maximization for a Mixture of K Gaussians # Created at: 17 September 2014 # Based on Mitchels' book: "Machine Learning" # -*- coding: utf-8 -*- from matplotlib import pyplot as plt from matplotlib.pyplot import plot, subplot, axhline import os.path import numpy as np import random ################## Uma Gaussiana mu = 5 sigma = 10 # random.gauss(mu, sigma) n = 1000 sample = lambda : random.gauss(mu,sigma) points = np.array([sample() for i in range(n)]) plot( points,'*') plt.title('Uma Gaussiana') #### a Estimativa é a média dos pontos np.mean(points) plt.show() ################## Duas Gaussianas mu_a = np.array([-10, 30]) sigma = 9 # random.gauss(mu, sigma) n = 1000 sel_mu = lambda : random.choice(mu_a) sample = lambda sel_mu: random.gauss(sel_mu(),sigma) x = np.array([sample(sel_mu) for i in range(n)]) plot(x,'*') plt.title('Duas Gaussianas') #### Usaremos EM (Expectation Maximiation) para estimar as médias (hipóteses) de cada Gaussiana a partir dos dados plt.show() def EM_Gaussians(x, sigma): ## initial guess for hypotheses (mean of Gaussians) h = np.array([random.choice(x) for i in range(mu_a.size)] ) #ini_h = lambda infx, supx: (np.arange(infx,supx,(supx-infx+1)/mu_a.size + 1)) #ini_h = lambda infx, supx: [infx, supx] #h = np.array( ini_h(min(x), max(x)) ) h_ = np.array(h) print 'Estimativa inicial, medias:', h # initialize vector for non-observable variables z_i1, z_i2 (which indicate which Gaussian generated the i_th point) z = np.zeros((n, h.size)) diff = 10 _inc = 0.00001 # avoid division by zero iterations = 0 while diff > 0.001: ## E-step : estimation step #- z will hold the expected estimation for the non-observable variables p = lambda x,u : np.exp((-1/(2*sigma**2))*(x-u)**2) expected_z = lambda i,j : p(x[i],h[j]) / (sum([ p(x[i],h[n]) for n in range(h.size) ]) + _inc) for i in range(x.size): for j in range(h.size): z[i][j] = expected_z(i, j) #print i,j,x[i], h[j], z[i][j], sum([ p(x[i],h[n]) for n in range(h.size) ]) ## M-step: find new hypotheses (means of Gaussians) which maximize the likelihood of observing the full data y = [x,z], given the current h ## Soma ponderada dos pontos, peso = probabilidade do ponto pertencer a tal Gaussiana (do passo anterior) for j in range(h_.size): h_[j] = sum([ z[i][j] * x[i] for i in range(x.size) ]) / (sum([ z[i][j] for i in range(x.size) ]) + _inc) # store in diff value to check for loop termination diff = np.mean(np.abs([ h[j] - h_[j] for j in range(h.size) ])) iterations += 1 #print h, h_ #print diff, iterations # update current hypothesis h = np.array(h_) return (h, z) ######################################### medias, z = EM_Gaussians(x,sigma) print 'Medias das Gaussianas, resultado', medias print 'Medias Reais', mu_a ######################################### ## Plota estimativa final para probabilidades de cada ponto pertencer a qual Gaussiana j = 1 #plot(z[:,j],'*-') x_1 = np.zeros(n) x_2 = np.zeros(n) x_1[:] = np.NAN x_2[:] = np.NAN for i in range(n): if z[i][0] > z[i][1]: x_1[i] = x[i] else: x_2[i] = x[i] plot(x_1,'*c') plot(x_2,'*k') axhline(medias[0]) axhline(medias[1]) ## original, medias reais => em vermelho axhline(mu_a[0],color='r') axhline(mu_a[1],color='r') plt.title('Resultados') plt.show()