Source code for simpleai.search.local

# coding=utf-8
from simpleai.search.utils import BoundedPriorityQueue, InverseTransformSampler
from simpleai.search.models import SearchNodeValueOrdered
import math
import random


def _all_expander(fringe, iteration, viewer):
    '''
    Expander that expands all nodes on the fringe.
    '''
    expanded_neighbors = [node.expand(local_search=True)
                          for node in fringe]

    if viewer:
        viewer.event('expanded', list(fringe), expanded_neighbors)

    list(map(fringe.extend, expanded_neighbors))


[docs]def beam(problem, beam_size=100, iterations_limit=0, viewer=None): ''' Beam search. beam_size is the size of the beam. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, SearchProblem.value, and SearchProblem.generate_random_state. ''' return _local_search(problem, _all_expander, iterations_limit=iterations_limit, fringe_size=beam_size, random_initial_states=True, stop_when_no_better=iterations_limit==0, viewer=viewer)
def _first_expander(fringe, iteration, viewer): ''' Expander that expands only the first node on the fringe. ''' current = fringe[0] neighbors = current.expand(local_search=True) if viewer: viewer.event('expanded', [current], [neighbors]) fringe.extend(neighbors)
[docs]def beam_best_first(problem, beam_size=100, iterations_limit=0, viewer=None): ''' Beam search best first. beam_size is the size of the beam. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, and SearchProblem.value. ''' return _local_search(problem, _first_expander, iterations_limit=iterations_limit, fringe_size=beam_size, random_initial_states=True, stop_when_no_better=iterations_limit==0, viewer=viewer)
[docs]def hill_climbing(problem, iterations_limit=0, viewer=None): ''' Hill climbing search. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, and SearchProblem.value. ''' return _local_search(problem, _first_expander, iterations_limit=iterations_limit, fringe_size=1, stop_when_no_better=True, viewer=viewer)
def _random_best_expander(fringe, iteration, viewer): ''' Expander that expands one randomly chosen nodes on the fringe that is better than the current (first) node. ''' current = fringe[0] neighbors = current.expand(local_search=True) if viewer: viewer.event('expanded', [current], [neighbors]) betters = [n for n in neighbors if n.value > current.value] if betters: chosen = random.choice(betters) if viewer: viewer.event('chosen_node', chosen) fringe.append(chosen)
[docs]def hill_climbing_stochastic(problem, iterations_limit=0, viewer=None): ''' Stochastic hill climbing. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, and SearchProblem.value. ''' return _local_search(problem, _random_best_expander, iterations_limit=iterations_limit, fringe_size=1, stop_when_no_better=iterations_limit==0, viewer=viewer)
[docs]def hill_climbing_random_restarts(problem, restarts_limit, iterations_limit=0, viewer=None): ''' Hill climbing with random restarts. restarts_limit specifies the number of times hill_climbing will be runned. If iterations_limit is specified, each hill_climbing will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, SearchProblem.value, and SearchProblem.generate_random_state. ''' restarts = 0 best = None while restarts < restarts_limit: new = _local_search(problem, _first_expander, iterations_limit=iterations_limit, fringe_size=1, random_initial_states=True, stop_when_no_better=True, viewer=viewer) if not best or best.value < new.value: best = new restarts += 1 if viewer: viewer.event('no_more_runs', best, 'returned after %i runs' % restarts_limit) return best
# Math literally copied from aima-python def _exp_schedule(iteration, k=20, lam=0.005, limit=100): ''' Possible scheduler for simulated_annealing, based on the aima example. ''' return k * math.exp(-lam * iteration) def _create_simulated_annealing_expander(schedule): ''' Creates an expander that has a random chance to choose a node that is worse than the current (first) node, but that chance decreases with time. ''' def _expander(fringe, iteration, viewer): T = schedule(iteration) current = fringe[0] neighbors = current.expand(local_search=True) if viewer: viewer.event('expanded', [current], [neighbors]) if neighbors: succ = random.choice(neighbors) delta_e = succ.value - current.value if delta_e > 0 or random.random() < math.exp(delta_e / T): fringe.pop() fringe.append(succ) if viewer: viewer.event('chosen_node', succ) return _expander
[docs]def simulated_annealing(problem, schedule=_exp_schedule, iterations_limit=0, viewer=None): ''' Simulated annealing. schedule is the scheduling function that decides the chance to choose worst nodes depending on the time. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.actions, SearchProblem.result, and SearchProblem.value. ''' return _local_search(problem, _create_simulated_annealing_expander(schedule), iterations_limit=iterations_limit, fringe_size=1, stop_when_no_better=iterations_limit==0, viewer=viewer)
def _create_genetic_expander(problem, mutation_chance): ''' Creates an expander that expands the bests nodes of the population, crossing over them. ''' def _expander(fringe, iteration, viewer): fitness = [x.value for x in fringe] sampler = InverseTransformSampler(fitness, fringe) new_generation = [] expanded_nodes = [] expanded_neighbors = [] for _ in fringe: node1 = sampler.sample() node2 = sampler.sample() child = problem.crossover(node1.state, node2.state) action = 'crossover' if random.random() < mutation_chance: # Noooouuu! she is... he is... *IT* is a mutant! child = problem.mutate(child) action += '+mutation' child_node = SearchNodeValueOrdered(state=child, problem=problem, action=action) new_generation.append(child_node) expanded_nodes.append(node1) expanded_neighbors.append([child_node]) expanded_nodes.append(node2) expanded_neighbors.append([child_node]) if viewer: viewer.event('expanded', expanded_nodes, expanded_neighbors) fringe.clear() for node in new_generation: fringe.append(node) return _expander
[docs]def genetic(problem, population_size=100, mutation_chance=0.1, iterations_limit=0, viewer=None): ''' Genetic search. population_size specifies the size of the population (ORLY). mutation_chance specifies the probability of a mutation on a child, varying from 0 to 1. If iterations_limit is specified, the algorithm will end after that number of iterations. Else, it will continue until it can't find a better node than the current one. Requires: SearchProblem.generate_random_state, SearchProblem.crossover, SearchProblem.mutate and SearchProblem.value. ''' return _local_search(problem, _create_genetic_expander(problem, mutation_chance), iterations_limit=iterations_limit, fringe_size=population_size, random_initial_states=True, stop_when_no_better=iterations_limit==0, viewer=viewer)
def _local_search(problem, fringe_expander, iterations_limit=0, fringe_size=1, random_initial_states=False, stop_when_no_better=True, viewer=None): ''' Basic algorithm for all local search algorithms. ''' if viewer: viewer.event('started') fringe = BoundedPriorityQueue(fringe_size) if random_initial_states: for _ in range(fringe_size): s = problem.generate_random_state() fringe.append(SearchNodeValueOrdered(state=s, problem=problem)) else: fringe.append(SearchNodeValueOrdered(state=problem.initial_state, problem=problem)) finish_reason = '' iteration = 0 run = True best = None while run: if viewer: viewer.event('new_iteration', list(fringe)) old_best = fringe[0] fringe_expander(fringe, iteration, viewer) best = fringe[0] iteration += 1 if iterations_limit and iteration >= iterations_limit: run = False finish_reason = 'reaching iteration limit' elif old_best.value >= best.value and stop_when_no_better: run = False finish_reason = 'not being able to improve solution' if viewer: viewer.event('finished', fringe, best, 'returned after %s' % finish_reason) return best