#!/usr/bin/env python
# coding=utf-8
import logging
logger = logging.getLogger()
import numpy
import scipy.optimize
[docs]def make_labels(X):
"""Helper function that generates a single 1D array with labels which
are good targets for stock logistic regression.
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or an
iterable containing 2 arrays with 2 dimensions each. Each correspond to
the data for one of the two classes, every row corresponds to one
example of the data set, every column, one different feature.
Returns
=======
labels : numpy.ndarray
With a single dimension, containing suitable labels for all rows and
for all classes defined in X (depth).
"""
return numpy.hstack(
[k * numpy.ones(len(X[k]), dtype=int) for k in range(len(X))]
)
[docs]class Machine:
"""A class to handle all run-time aspects for Logistic Regression
Parameters
==========
theta : numpy.ndarray
A set of parameters for the Logistic Regression model. This must be an
iterable (or numpy array) with all parameters for the model, including
the bias term, which must be on entry 0 (the first entry at the
iterable).
"""
def __init__(self, theta):
self.theta = numpy.array(theta).copy()
def __call__(self, X):
"""Spits out the hypothesis given the data.
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 2 dimensions.
Every row corresponds to one example of the data set, every column,
one different feature.
Returns
=======
hypothesis : numpy.ndarray
A 1D array with as many entries as rows in the input 2D array
``X``, representing g(x), the sigmoidal hypothesis.
"""
Xp = numpy.hstack((numpy.ones((len(X), 1)), X)) # add bias term
return 1.0 / (1.0 + numpy.exp(-numpy.dot(Xp, self.theta)))
[docs] def predict(self, X):
"""Predicts the class of each row of X
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 2 dimensions.
Every row corresponds to one example of the data set, every column,
one different feature.
Returns
=======
predictions : numpy.ndarray
A 1D array with as many entries as rows in the input 2D array
``X``, representing g(x), the class predictions for the current
machine.
"""
retval = self(X)
retval[retval < 0.5] = 0.0
retval[retval >= 0.5] = 1.0
return retval.astype(int)
[docs] def J(self, X, regularizer=0.0):
"""
Calculates the logistic regression cost
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or an
iterable containing 2 numpy.ndarrays with 2 dimensions each. Each
correspond to the data for one of the two classes, every row
corresponds to one example of the data set, every column, one
different feature.
regularizer : float
The regularization parameter
Returns
=======
cost : float
The averaged (regularized) cost for the whole dataset
"""
h = numpy.hstack([self(X[k]) for k in (0, 1)])
y = make_labels(X)
logh = numpy.nan_to_num(numpy.log(h))
log1h = numpy.nan_to_num(numpy.log(1 - h))
regularization_term = regularizer * (self.theta[1:] ** 2).sum()
main_term = -(y * logh + ((1 - y) * log1h)).mean()
return main_term + regularization_term
[docs] def dJ(self, X, regularizer=0.0):
"""
Calculates the logistic regression first derivative of the cost w.r.t.
each parameter theta
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or
an iterable containing 2 arrays with 2 dimensions each. Each
correspond to the data for one of the two classes, every row
corresponds to one example of the data set, every column, one
different feature.
regularizer : float
The regularization parameter, if the solution should be regularized.
Returns
=======
grad : numpy.ndarray
A 1D array with as many entries as columns on the input matrix
``X`` plus 1 (the bias term). It denotes the average gradient of
the cost w.r.t. to each machine parameter theta.
"""
Xflat = numpy.vstack([k for k in X])
Xp = numpy.hstack((numpy.ones((len(Xflat), 1)), Xflat)) # add bias term
y = make_labels(X)
retval = ((self(Xflat) - y) * Xp.T).T.mean(axis=0)
retval[1:] += (regularizer * self.theta[1:]) / len(X)
return retval
[docs]class Trainer:
"""A class to handle all training aspects for Logistic Regression
Parameters
==========
regularizer : float
The regularization parameter
"""
def __init__(self, regularizer=0.0):
self.regularizer = regularizer
[docs] def J(self, theta, machine, X):
"""
Calculates the vectorized cost *J*.
"""
machine.theta = theta
return machine.J(X, self.regularizer)
[docs] def dJ(self, theta, machine, X):
"""
Calculates the vectorized partial derivative of the cost *J* w.r.t. to
**all** :math:`\theta`'s. Use the training dataset.
"""
machine.theta = theta
return machine.dJ(X, self.regularizer)
[docs] def train(self, X):
"""
Optimizes the machine parameters to fit the input data, using
``scipy.optimize.fmin_l_bfgs_b``.
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or
an iterable containing 2 arrays with 2 dimensions each. Each
correspond to the data for one of the two classes, every row
corresponds to one example of the data set, every column, one
different feature.
Returns
=======
machine : Machine
A trained machine.
Raises
======
RuntimeError
In case problems exist with the design matrix ``X`` or with
convergence.
"""
# check data dimensionality if not organized in a matrix
if not isinstance(X, numpy.ndarray):
baseline = X[0].shape[1]
for k in X:
if k.shape[1] != baseline:
raise RuntimeError(
"Mismatch on the dimensionality of input `X`"
)
# prepare the machine
theta0 = numpy.zeros(X[0].shape[1] + 1) # include bias terms
machine = Machine(theta0)
logger.debug("Settings:")
logger.debug(" * initial guess = %s", [k for k in theta0])
logger.debug(" * cost (J) = %g", machine.J(X, self.regularizer))
logger.debug("Training using scipy.optimize.fmin_l_bfgs_b()...")
# Fill in the right parameters so that the minimization can take place
theta, cost, d = scipy.optimize.fmin_l_bfgs_b(
self.J,
theta0,
self.dJ,
(machine, X),
)
if d["warnflag"] == 0:
logger.info("** LBFGS converged successfuly **")
machine.theta = theta
logger.debug("Final settings:")
logger.debug(" * theta = %s", [k for k in theta])
logger.debug(" * cost (J) = %g", cost)
return machine
else:
message = "LBFGS did **not** converged:"
if d["warnflag"] == 1:
message += " Too many function evaluations"
elif d["warnflag"] == 2:
message += " %s" % d["task"]
raise RuntimeError(message)
[docs]class MultiClassMachine:
"""A class to handle all run-time aspects for Multiclass Log. Regression
Parameters
==========
machines : :py:class:`list` or :py:class:`tuple`
An iterable over any number of machines that will be stored.
"""
def __init__(self, machines):
self.machines = machines
def __call__(self, X):
"""Spits out the hypothesis for each machine given the data
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 2 dimensions.
Every row corresponds to one example of the data set, every column,
one different feature.
Returns
=======
hypothesis : numpy.ndarray
A 2D array with as many entries as rows in the input 2D
array ``X``, representing g(x), the sigmoidal hypothesis. Each
column on the output array represents the output of one of the
logistic regression machines in this
"""
return numpy.vstack([m(X) for m in self.machines]).T
[docs] def predict(self, X):
"""Predicts the class of each row of X
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or
an iterable containing 2 arrays with 2 dimensions each. Each
correspond to the data for one of the two classes, every row
corresponds to one example of the data set, every column, one
different feature.
Returns
=======
predictions : numpy.ndarray
A 1D array with as many entries as rows in the input 2D array
``X``, representing g(x), the class predictions for the current
machine.
"""
return self(X).argmax(axis=1)
[docs]class MultiClassTrainer:
"""A class to handle all training aspects for Multiclass Log. Regression
Parameters
==========
regularizer : float
The regularization parameter
"""
def __init__(self, regularizer=0.0):
self.regularizer = regularizer
[docs] def train(self, X):
"""
Trains multiple logistic regression classifiers to handle the multiclass
problem posed by ``X``.
Parameters
==========
X : numpy.ndarray
The input data matrix. This must be an array with 3 dimensions or
an iterable containing 2 arrays with 2 dimensions each. Each
correspond to the data for one of the input classes, every row
corresponds to one example of the data set, every column, one
different feature.
Returns
=======
machine : Machine
A trained multiclass machine.
"""
_trainer = Trainer(self.regularizer)
if (
len(X) == 2
): # trains and returns a single logistic regression classifer
return _trainer.train(X)
else: # trains and returns a multi-class logistic regression classifier
# use one-versus-all strategy
machines = []
for k in range(len(X)):
NC_range = list(range(0, k)) + list(range(k + 1, len(X)))
Xp = [numpy.vstack(X[NC_range]), X[k]]
machines.append(_trainer.train(Xp))
return MultiClassMachine(machines)