1+ using System ;
2+
3+ namespace Nest
4+ {
5+ public class ReindexObservable < T > : IDisposable , IObservable < IReindexResponse < T > > where T : class
6+ {
7+ private ReindexDescriptor < T > _reindexDescriptor ;
8+ internal IElasticClient CurrentClient { get ; set ; }
9+ internal ReindexDescriptor < T > ReindexDescriptor { get ; set ; }
10+
11+ public ReindexObservable ( IElasticClient client , ReindexDescriptor < T > reindexDescriptor )
12+ {
13+ this . _reindexDescriptor = reindexDescriptor ;
14+ this . CurrentClient = client ;
15+ }
16+
17+ public IDisposable Subscribe ( IObserver < IReindexResponse < T > > observer )
18+ {
19+ observer . ThrowIfNull ( "observer" ) ;
20+ try
21+ {
22+ this . Reindex ( observer ) ;
23+ }
24+ catch ( Exception e )
25+ {
26+ observer . OnError ( e ) ;
27+ }
28+ return this ;
29+
30+ }
31+
32+ private void Reindex ( IObserver < IReindexResponse < T > > observer )
33+ {
34+ var fromIndex = this . _reindexDescriptor . _FromIndexName ;
35+ var toIndex = this . _reindexDescriptor . _ToIndexName ;
36+ var scroll = this . _reindexDescriptor . _Scroll ?? "2m" ;
37+
38+ fromIndex . ThrowIfNullOrEmpty ( "fromIndex" ) ;
39+ toIndex . ThrowIfNullOrEmpty ( "toIndex" ) ;
40+
41+ var indexSettings = this . CurrentClient . GetIndexSettings ( this . _reindexDescriptor . _FromIndexName ) ;
42+ var createSettings = new CreateIndexDescriptor ( this . CurrentClient . Settings ) . InitializeUsing ( indexSettings . Settings ) ;
43+ var createIndexResponse = this . CurrentClient
44+ . CreateIndex ( toIndex , ( c ) => this . _reindexDescriptor . _CreateIndexSelector ( createSettings ) ) ;
45+ if ( ! createIndexResponse . IsValid )
46+ throw new ReindexException ( createIndexResponse . ConnectionStatus ) ;
47+
48+ var page = 0 ;
49+ var searchResult = this . CurrentClient . Search < T > (
50+ s => s
51+ . Index ( fromIndex )
52+ . AllTypes ( )
53+ . From ( 0 )
54+ . Take ( 100 )
55+ . Query ( this . _reindexDescriptor . _QuerySelector )
56+ . SearchType ( SearchType . Scan )
57+ . Scroll ( scroll )
58+ ) ;
59+ if ( searchResult . Total <= 0 )
60+ throw new ReindexException ( searchResult . ConnectionStatus , "index " + fromIndex + " has no documents!" ) ;
61+ IBulkResponse indexResult = null ;
62+ do
63+ {
64+ searchResult = this . CurrentClient . Scroll < T > ( scroll , searchResult . ScrollId ) ;
65+ if ( searchResult . Documents . HasAny ( ) )
66+ indexResult = this . IndexSearchResults ( searchResult , observer , toIndex , page ) ;
67+ page ++ ;
68+ } while ( searchResult . IsValid && indexResult != null && indexResult . IsValid && searchResult . Documents . HasAny ( ) ) ;
69+
70+
71+ observer . OnCompleted ( ) ;
72+ }
73+
74+ public IBulkResponse IndexSearchResults ( IQueryResponse < T > searchResult , IObserver < IReindexResponse < T > > observer , string toIndex , int page )
75+ {
76+ if ( ! searchResult . IsValid )
77+ throw new ReindexException ( searchResult . ConnectionStatus , "reindex failed on scroll #" + page ) ;
78+
79+ var indexResult = this . CurrentClient . IndexMany ( searchResult . Documents , toIndex ) ;
80+ if ( ! indexResult . IsValid )
81+ throw new ReindexException ( indexResult . ConnectionStatus , "reindex failed when indexing page " + page ) ;
82+
83+ observer . OnNext ( new ReindexResponse < T > ( )
84+ {
85+ BulkResponse = indexResult ,
86+ QueryResponse = searchResult ,
87+ Scroll = page
88+ } ) ;
89+ return indexResult ;
90+ }
91+
92+
93+ public void Dispose ( )
94+ {
95+
96+ }
97+ }
98+ }
0 commit comments