Source code for skshape.numerics.matrix

import numpy as np
from numpy.linalg import solve as direct_solve
from scipy.sparse.construct import spdiags, bmat
from scipy.sparse.linalg import svds, LinearOperator
from scipy.linalg.lapack import dgtsv
from scipy.linalg import solve_banded


[docs]class Matrix(LinearOperator):
[docs] def size(self): raise NotImplementedError("This function has not been implemented.")
[docs] def copy(self): raise NotImplementedError("This function has not been implemented.")
def __imul__(self,scalar): raise NotImplementedError("This function has not been implemented.") def __mul__(self,v): raise NotImplementedError("This function has not been implemented.") def __rmul__(self,v): if np.isscalar(v): return self.__mul__(v) else: raise ValueError("Multiply from right of Matrix works only for scalars!") def __add__(self,M): raise NotImplementedError("This function has not been implemented.") def __sub__(self,M): raise NotImplementedError("This function has not been implemented.")
[docs] def norm(self): raise NotImplementedError("This function has not been implemented.")
def _matvec(self,v): return self.__mul__(v)
[docs] def matvec(self,v): return self.__mul__(v)
[docs] def rmatvec(self,v): raise NotImplementedError("This function has not been implemented.")
[docs] def solve(self,d): raise NotImplementedError("This function has not been implemented.")
[docs]class TridiagMatrix(Matrix): """ The TridiagMatrix class to store tridiagonal matrices of the form :: ( b(0) c(0) a(0) ) ( a(1) b(1) c(1) ) ( a(2) . . ) ( . . . ) ( . . c(n-1) ) ( c(n) a(n) b(n) ) It is used for implementing efficient linear algebra for tridiagonal matrices. The following methods are available: Methods ------- set_diag(d,band_no) get_diag(d,band_no=None) rmatvec(v) factorize() solve(d,L=None,M=None) to_sparse(format='lil') norm(norm_type='Frobenius') copy() size() """ def __init__(self,a,b,c,copy=True): if not copy: # Don't copy, refer to the original input arrays. self._a = a self._b = b self._c = c else: # Store copies of the input arrays. self._b = b.copy() if len(a) == len(b): self._a = a.copy() else: self._a = np.empty(len(b)) n = len(a) self._a[-n:] = a self._a[0:-n] = 0.0 if len(c) == len(b): self._c = c.copy() else: self._c = np.empty(len(b)) n = len(c) self._c[0:n] = c self._c[n:] = 0.0 n = len(b) self.shape = (n,n) self._factorization = None def __repr__(self): repr_str = "lower diagonal: " + str(self._a) + "\n" \ "mid diagonal : " + str(self._b) + "\n" \ "upper diagonal: " + str(self._c) + "\n" return repr_str def __str__(self): repr_str = "Upper " + str(self._a) + \ ", mid " + str(self._b) + ", lower " + str(self._c) return repr_str
[docs] def size(self): 'Tridiag.size() returns the size of the tridiagonal matrix' return len(self._b)
[docs] def copy(self): return TridiagMatrix( self._a, self._b, self._c )
[docs] def set_diag(self,d,band_no): """Set the diagonal band to given d array. Tridiag.setDiag( d, band_no ) sets the band specified with band_no to the values given in d. The argument band_no should be one of -1,0,1. """ if not (band_no in [-1,0,1]): raise ValueError('Argument band_no should be one of -1,0,1') elif len(d) != len(self._b): raise ValueError('Size of input vector d should be %d' % len(self._b)) elif band_no == -1: self._a[:] = d[:] elif band_no == 0: self._b[:] = d[:] elif band_no == 1: self._c[:] = d[:]
[docs] def get_diag(self,band_no=None): """Returns the selected diagonal of the tridiag matrix Tridiag.getDiag( d, band_no ) returns the values in the band specified with band_no, which should be one of -1,0,1. The default value of band_no is None. In that case, all three diagonals are returned. """ if band_no is None: return (self._a, self._b, self._c) if not (band_no in [-1,0,1]): raise ValueError('band_no should be None or one of -1,0,1') elif band_no == -1: return self._a elif band_no == 0: return self._b elif band_no == 1: return self._c
def __imul__(self,scalar): self._a *= scalar self._b *= scalar self._c *= scalar return self def __mul__(self,v): """Matrix-vector product TridiagMatrix.__mul__( v ) returns the matrix-vector product with the vector v. """ a = self._a; b = self._b; c = self._c try: # try multiplication as if v is a vector N = len(v) # might raise an exception if v is a scalar if N != len(b): raise ValueError('The matrix and the vector should be of the same dimension!') u = b * v u[0] += a[0] * v[N-1] u[1:N] += a[1:N] * v[0:N-1] u[N-1] += c[N-1] * v[0] u[0:N-1] += c[0:N-1] * v[1:N] return u except TypeError: # v might be a scalar, or we get another exception new_a = v * a new_b = v * b new_c = v * c return TridiagMatrix( new_a, new_b, new_c, copy=False )
[docs] def rmatvec(self,v): a = self._a; b = self._b; c = self._c try: # try multiplication as if v is a vector N = len(v) # might raise an exception if v is a scalar if N != len(self._b): raise ValueError('The matrix and the vector should be of the same dimension!') u = b * v u[0] += c[N-1] * v[N-1] u[1:N] += c[0:N-1] * v[0:N-1] u[N-1] += a[0] * v[0] u[0:N-1] += a[1:N] * v[1:N] return u except TypeError: # v might be a scalar, or we get another exception new_a = v * a new_b = v * b new_c = v * c return TridiagMatrix( new_a, new_b, new_c, copy=False )
def __add__(self,M): a = self._a + M.get_diag(-1) b = self._b + M.get_diag(0) c = self._c + M.get_diag(1) return TridiagMatrix(a,b,c) def __sub__(self,M): a = self._a - M.get_diag(-1) b = self._b - M.get_diag(0) c = self._c - M.get_diag(1) return TridiagMatrix(a,b,c)
[docs] def norm(self,norm_type='Frobenius'): """Calculates the norm of the matrix. The norm_type should be one 0, 2, inf or 'Frobenius' (the default value). """ if norm_type == 'Frobenius': total = np.sum(self._a**2) total += np.sum(self._b**2) total += np.sum(self._c**2) return np.sqrt(total) elif norm_type in [np.inf,'inf','Inf']: row_sums = np.abs(self._a) + abs(self._b) + abs(self._c) return np.max(row_sums) elif norm_type in [0,'0']: a = self._a c = self._c N = len(a) col_sums = np.abs(self._b) col_sums[0:N-1] += np.abs(a[1:N]) col_sums[N-1] += np.abs(a[0]) col_sums[1:N] += np.abs(c[0:N-1]) col_sums[0] += np.abs(c[N-1]) return np.max(col_sums) elif norm_type in [2,'2']: return svds(self, k=1, which='LM', return_singular_vectors=False) else: raise ValueError('Cannot compute the specified norm')
[docs] def factorize(self): if self._factorization is not None: return self._factorization a = self._a b = self._b c = self._c n = self.size() periodic_tridiag_system = (a[0] != 0) or (c[n-1] != 0) if periodic_tridiag_system: b[0] -= a[0] b[n-1] -= c[n-1] L,U = tridiag_LU( a[1:], b, c[:n-1] ) self._factorization = (L,U) if periodic_tridiag_system: b[0] += a[0] b[n-1] += c[n-1] return (L,U)
[docs] def solve(self, d, L=None, M=None): try: return fast_tridiag_solve(self,d) except Exception as e: print('Could not use fast triadiagonal solver! Reverting to slow Python code. (Error = %s)' % e) #sys.exc_info()[0]) if L is None or M is None: # Factorization not given as arguments. if self._factorization is not None: # Factorization cached. L,M = self._factorization else: # Factorization not available. Sove from scratch. return tridiag_solve(self,d) # At this point, LU factorization (L,M) is available and will be used. a = self._a b = self._b c = self._c n = len(b) if len(d) != n: raise Exception('Matrix A and right hand side vector d should be the same size') periodic_tridiag_system = (a[0] != 0) or (c[n-1] != 0) if periodic_tridiag_system: b[0] -= a[0] b[n-1] -= c[n-1] y = tridiag_solve_from_LU( L, M, c[:n-1], d ) if periodic_tridiag_system: u = np.zeros(n); v = np.zeros(n) u[0] = a[0]; u[n-1] = c[n-1] v[0] = 1; v[n-1] = 1 z = tridiag_solve_from_LU( L, M, c[:n-1], u ) x = y - z * np.dot(v,y) / (1 + np.dot(v,z)) b[0] += a[0] b[n-1] += c[n-1] else: x = y return x
[docs] def to_sparse(self, format='lil'): a = self._a b = self._b c = self._c n = len(b) a0 = a[0] cn = c[n-1] aa = np.hstack(( a[1:n], a0 )) cc = np.hstack(( cn, c[0:n-1] )) data = np.array([aa,b,cc]) diags = np.array([-1,0,1]) A = spdiags( data, diags, n, n, format=format ) A[0,n-1] = a0 A[n-1,0] = cn return A
[docs]def fast_tridiag_solve(A,d): a = A.get_diag(-1) b = A.get_diag(0) c = A.get_diag(1) n = A.size() a0 = a[0] cn = c[n-1] a = a[1:n].copy() b = b.copy() c = c[0:n-1].copy() y = d.copy() periodic_tridiag_system = (a0 != 0) or (cn != 0) if periodic_tridiag_system: b[0] -= a0 b[n-1] -= cn a2 = a.copy() b2 = b.copy() c2 = c.copy() vec_size = c_int(n) n_rhs = c_int(1) info_out = c_int(0) try: # First try the Lapack tridiagonal solver. _,_,_,y,info = dgtsv(a,b,c,d) except Exception as e: # If Lapack tridiag solver doesn't work, try SciPy's solve_banded. print("Call to Lapack failed: error = " + str(e)) print("Unable to use Lapack DGTSV as tridiagonal solver, using scipy.linalg.solve_banded instead.") A = np.empty((3,n)) A[1,0:n] = b A[0,1:n] = c; A[0,0] = 0.0 A[2,0:n-1] = a; A[2,n-1] = 0.0 y = solve_banded( (1,1), A, d ) if not periodic_tridiag_system: x = y else: # it is a periodic tridiag system u = np.zeros(n); v = np.zeros(n) # also use u instead of z u[0] = a0; u[n-1] = cn v[0] = 1; v[n-1] = 1 try: # First try Lapack tridiagonal solver. _,_,_,u,info = dgtsv(a2,b2,c2,u) except Exception as e: # If Lapack tridiag solver doesn't work, try SciPy's solve_banded. print("Call to Lapack failed: error = " + str(e)) print("Unable to use Lapack DGTSV as tridiagonal solver, using scipy.linalg.solve_banded instead.") A = np.empty((3,n)) A[1,0:n] = b2 A[0,1:n] = c2; A[0,0] = 0.0 A[2,0:n-1] = a2; A[2,n-1] = 0.0 u = solve_banded( (1,1), A, u ) x = y - u * np.dot(v,y) / (1 + np.dot(v,u)) # Used u instead of z, compare with the slow tridiag solver. return x
[docs]def tridiag_solve(A,d): """Efficient solve of a tridiagonal linear system. tridiag_solve( A, d ) takes a tridiagonal matrix A and right hand side vector d and returns the solution x of the linear system A x = d. """ a = A.get_diag(-1) b = A.get_diag(0) c = A.get_diag(1) n = A.size() if len(d) != n: raise Exception('Matrix A and right hand side vector d should be the same size') periodic_tridiag_system = (a[0] != 0) or (c[n-1] != 0) if periodic_tridiag_system: b[0] -= a[0] b[n-1] -= c[n-1] l,m = tridiag_LU( a[1:], b, c[:n-1] ) y = tridiag_solve_from_LU( l, m, c[:n-1], d ) if periodic_tridiag_system: u = np.zeros(n); v = np.zeros(n) u[0] = a[0]; u[n-1] = c[n-1] v[0] = 1; v[n-1] = 1 z = tridiag_solve_from_LU( l, m, c[:n-1], u ) x = y - z * np.dot(v,y) / (1 + np.dot(v,z)) b[0] += a[0] b[n-1] += c[n-1] else: x = y return x
[docs]def tridiag_LU(a,b,c): """Computes the LU decomposition of the triadiagonal matrix. tridiag_LU( a, b, c ) computes the LU decomposition of the tridiagonal matrix :: ( b(0) c(0) a(0) ) ( a(1) b(1) c(1) ) A = ( a(2) . . ) ( . . . ) ( . . c(n-1) ) ( c(n) a(n) b(n) ) from the given three bands a,b,c and returns two vectors l,m that give the LU decomposition ``A = L U`` where ``L = Id + diag(-1,l), U = diag(0,m) + diag(1,r)`` """ N = len(b); m = np.empty(N); l = np.empty(N); m[0] = b[0] for i in range(N-1): l[i] = a[i] / m[i] m[i+1] = b[i+1] - l[i]*c[i] return (l,m)
[docs]def tridiag_solve_from_LU(l,m,c,d): """Computes the solution of the linear system from LU decomposition. tridiag_solve_from_LU( l, m, c, d ) takes the LU decomposition l,m (which should be calculated by tridiagLU), the third band c (band_no = 1), and right hand side d of the tridiagonal linear system and returns its solution. See documentation of tridiagLU for more informations on the arguments. """ N = len(d) x = np.empty(N); y = np.empty(N); y[0] = d[0] for i in range(1,N): y[i] = d[i] - l[i-1]*y[i-1] x[N-1] = y[N-1] / m[N-1] for i in range(N-2,-1,-1): x[i] = (y[i] - c[i]*x[i+1]) / m[i] return x
[docs]class BlockDiagMatrix(Matrix): def __init__(self,matrix_list=None): self._submatrices = matrix_list self._factorization = None if matrix_list is None: self._n = 0 else: # Sum of sizes of all submatrices given in matrix_list self._n = sum(( m.size() for m in matrix_list )) self.shape = (self._n, self._n)
[docs] def size(self): return self._n
[docs] def copy(self): if self._submatrices is None: return BlockDiagMatrix() else: submatrices_copy = [ M.copy() for M in self._submatrices ] return BlockDiagMatrix( submatrices_copy )
[docs] def submatrices(self): return self._submatrices
def __imul__(self,scalar): for M in self._submatrices: M *= scalar return self def __mul__(self,v): try: # Try multiplication assuming v is a vector (not a scalar) n = len(v) # Might raise an exception if v is a scalar if n != self._n: raise ValueError('The matrix and the vector should be of the same dimension!') u = np.empty(n) start = 0 for M in self._submatrices: end = start + M.size() u[start:end] = M * v[start:end] start = end return u except TypeError: # v might be a scalar or we get another exception new_matrix_list = [ M.copy()*v for M in self._submatrices ] return BlockDiagMatrix( new_matrix_list )
[docs] def rmatvec(self,v): try: # Try multiplication as if v is a vector n = len(v) # Might raise an exception if v is a scalar if n != self._n: raise ValueError('The matrix and the vector should have the same dimension!') u = np.empty(n) start = 0 for M in self._submatrices: end = start + M.size() u[start:end] = v[start:end] * M start = end return u except TypeError: # v might be a scalar, or we get another exception new_matrix_list = [ v*M.copy() for M in self._submatrices ] return BlockDiagMatrix( new_matrix_list )
def __add__(self,M): new_matrix_list = [ A+B for A,B in zip(self._submatrices, M.submatrices()) ] return BlockDiagMatrix( new_matrix_list ) def __sub__(self,M): new_matrix_list = [ A-B for A,B in zip(self._submatrices, M.submatrices()) ] return BlockDiagMatrix( new_matrix_list )
[docs] def factorize(self): if self._factorization is not None: factorization = self._factorization else: factorization = [ M.factorize() for M in self._submatrices ] self._factorization = factorization return factorization
[docs] def solve(self,v,factorization=None): u = np.empty( len(v) ) if factorization is None: if self._factorization is not None: factorization = self._factorization else: start = 0 for M in self._submatrices: end = start + M.size() u[start:end] = M.solve( v[start:end] ) start = end return u # At this point, factorization is available and will be used. start = 0 for M, (L,U) in zip(self._submatrices, factorization): end = start + M.size() u[start:end] = M.solve( v[start:end], L, U ) start = end return u
[docs] def to_sparse(self, format='lil'): matrices = self._submatrices n = len( matrices ) blocks = [] for k in range(n): row = [None]*k + [ matrices[k].to_sparse(format) ] + [None]*(n-k-1) blocks.append( row ) A = bmat( blocks ) return A
[docs]class MatrixWithRankOneUpdates(Matrix): def __init__(self, core_matrix, update_vectors): self._A = core_matrix self._U = np.vstack(( u for u,v in update_vectors )).T self._V = np.vstack(( v for u,v in update_vectors )) self._C = None self._W = None self.shape = core_matrix.shape
[docs] def size(self): return self._core_matrix.size()
[docs] def copy(self): new_core_matrix = self._A.copy() return MatrixWithRankOneUpdates( new_core_matrix, self.update_vectors() )
[docs] def core_matrix(self): return self._A
[docs] def update_vectors(self,copy=True): U = self._U V = self._V if copy: return [ (U[:,k].copy(), V[k,:].copy()) for k in range(U.shape[1])] else: return [ (U[:,k], V[k,:]) for k in range(U.shape[1]) ]
def __imul__(self,scalar): self._A *= scalar self._U *= scalar if self._W is not None: self._W *= scalar self._C = None return self def __mul__(self,x): A = self._A U = self._U V = self._V try: # Try multiplication assuming v is a vector (not a scalar) n = len(x) # Might raise an exception if v is a scalar if n != self._A.size(): raise ValueError('The matrix and the vector should be of the same dimension!') y = (A * x) + np.dot( U, np.dot(V, x)) return y except TypeError: # v might be a scalar or we get another exception new_A = A.copy() * x new_vecs = [ (U[:,k].copy(), V[k,:]*x) for k in range(U.shape[1]) ] return MatrixWithRankOneUpdates( new_A, new_vecs )
[docs] def rmatvec(self,x): A = self._A U = self._U V = self._V try: # Try multiplication assuming v is a vector (not a scalar) n = len(x) # Might raise an exception if v is a scalar if n != self._A.size(): raise ValueError('The matrix and the vector should be of the same dimension!') y = (x * A) + np.dot( np.dot(x, U), V) return y except TypeError: # v might be a scalar or we get another exception new_A = x * A.copy() new_vecs = [ (x*U[:,k], V[k,:].copy()) for k in range(U.shape[1]) ] return MatrixWithRankOneUpdates( new_A, new_vecs )
def __add__(self,M): new_core_matrix = self._A + M.core_matrix() new_vector_list = list( self.update_vectors() ) new_vector_list.extend( M.update_vectors() ) return MatrixWithRankOneUpdates( new_core_matrix, new_vector_list ) def __sub__(self,M): new_core_matrix = self._A - M.core_matrix() new_vector_list = list( self.update_vectors() ) new_vector_list.extend(( (-u, v.copy()) for u,v in M.update_vectors() )) return MatrixWithRankOneUpdates( new_core_matrix, new_vector_list )
[docs] def solve(self,b): A = self._A V = self._V W = self._W C = self._C n_vectors, vec_size = V.shape if W is None: self._W = W = np.empty( (vec_size, n_vectors) ) for k in range(n_vectors): W[:,k] = A.solve( self._U[:,k] ) if C is None: self._C = C = np.eye( n_vectors ) + np.dot(V,W) y = A.solve(b) z = direct_solve( C, np.dot(V,y) ) # C is supposed to be small y -= np.dot(W,z) return y