22using rubberduckvba . Server . ContentSynchronization . Pipeline . Sections . Context ;
33using rubberduckvba . Server . Model ;
44using rubberduckvba . Server . Services ;
5+ using rubberduckvba . Server . Services . rubberduckdb ;
56using System . Threading . Tasks . Dataflow ;
67
78namespace rubberduckvba . Server . ContentSynchronization . Pipeline . Sections . SyncTags ;
89
910public class SyncTagsSection : PipelineSection < SyncContext >
1011{
11- public SyncTagsSection ( IPipeline < SyncContext , bool > parent , CancellationTokenSource tokenSource , ILogger logger , IRubberduckDbService content , IGitHubClientService github , IStagingServices staging )
12+ public SyncTagsSection ( IPipeline < SyncContext , bool > parent , CancellationTokenSource tokenSource , ILogger logger , TagServices tagServices , IGitHubClientService github , IStagingServices staging )
1213 : base ( parent , tokenSource , logger )
1314 {
14- ReceiveRequest = new ReceiveRequestBlock ( this , tokenSource , logger ) ;
15- BroadcastParameters = new BroadcastParametersBlock ( this , tokenSource , logger ) ;
16- AcquireDbMainTag = new AcquireDbMainTagGraphBlock ( this , tokenSource , content , logger ) ;
17- AcquireDbNextTag = new AcquireDbNextTagGraphBlock ( this , tokenSource , content , logger ) ;
18- JoinDbTags = new DataflowJoinBlock < TagGraph , TagGraph > ( this , tokenSource , logger , nameof ( JoinDbTags ) ) ;
19- LoadDbTags = new LoadDbLatestTagsBlock ( this , tokenSource , logger ) ;
20- LoadGitHubTags = new LoadGitHubTagsBlock ( this , tokenSource , github , logger ) ;
21- JoinTags = new DataflowJoinBlock < SyncContext , SyncContext > ( this , tokenSource , logger , nameof ( JoinTags ) ) ;
22- BroadcastTags = new BroadcastTagsBlock ( this , tokenSource , logger ) ;
23- StreamGitHubTags = new StreamGitHubTagsBlock ( this , tokenSource , logger ) ;
24- GetTagAssets = new GetTagAssetsBlock ( this , tokenSource , github , logger ) ;
25- TagBuffer = new TagBufferBlock ( this , tokenSource , logger ) ;
26- AccumulateProcessedTags = new AccumulateProcessedTagsBlock ( this , tokenSource , logger ) ;
27- SaveTags = new BulkSaveStagingBlock ( this , tokenSource , staging , logger ) ;
15+ SynchronizeTags = new SynchronizeTagsBlock ( this , tokenSource , logger , tagServices , github ) ;
16+ //ReceiveRequest = new ReceiveRequestBlock(this, tokenSource, logger);
17+ //BroadcastParameters = new BroadcastParametersBlock(this, tokenSource, logger);
18+ //AcquireDbMainTag = new AcquireDbMainTagGraphBlock(this, tokenSource, content, logger);
19+ //AcquireDbNextTag = new AcquireDbNextTagGraphBlock(this, tokenSource, content, logger);
20+ //JoinDbTags = new DataflowJoinBlock<TagGraph, TagGraph>(this, tokenSource, logger, nameof(JoinDbTags));
21+ //LoadDbTags = new LoadDbLatestTagsBlock(this, tokenSource, logger);
22+ //LoadGitHubTags = new LoadGitHubTagsBlock(this, tokenSource, github, logger);
23+ //JoinTags = new DataflowJoinBlock<SyncContext, SyncContext>(this, tokenSource, logger, nameof(JoinTags));
24+ //BroadcastTags = new BroadcastTagsBlock(this, tokenSource, logger);
25+ //StreamGitHubTags = new StreamGitHubTagsBlock(this, tokenSource, logger);
26+ //GetTagAssets = new GetTagAssetsBlock(this, tokenSource, github, logger);
27+ //TagBuffer = new TagBufferBlock(this, tokenSource, logger);
28+ //AccumulateProcessedTags = new AccumulateProcessedTagsBlock(this, tokenSource, logger);
29+ //SaveTags = new BulkSaveStagingBlock(this, tokenSource, staging, logger);
2830 }
2931
30- #region blocks
31- private ReceiveRequestBlock ReceiveRequest { get ; }
32- private BroadcastParametersBlock BroadcastParameters { get ; }
33- private AcquireDbMainTagGraphBlock AcquireDbMainTag { get ; }
34- private AcquireDbNextTagGraphBlock AcquireDbNextTag { get ; }
35- private DataflowJoinBlock < TagGraph , TagGraph > JoinDbTags { get ; }
36- private LoadDbLatestTagsBlock LoadDbTags { get ; }
37- private LoadGitHubTagsBlock LoadGitHubTags { get ; }
38- private DataflowJoinBlock < SyncContext , SyncContext > JoinTags { get ; }
39- private BroadcastTagsBlock BroadcastTags { get ; }
40- private StreamGitHubTagsBlock StreamGitHubTags { get ; }
41- private GetTagAssetsBlock GetTagAssets { get ; }
42- private TagBufferBlock TagBuffer { get ; }
43- private AccumulateProcessedTagsBlock AccumulateProcessedTags { get ; }
44- private BulkSaveStagingBlock SaveTags { get ; }
32+ //#region blocks
33+ private SynchronizeTagsBlock SynchronizeTags { get ; }
34+ //private ReceiveRequestBlock ReceiveRequest { get; }
35+ //private BroadcastParametersBlock BroadcastParameters { get; }
36+ //private AcquireDbMainTagGraphBlock AcquireDbMainTag { get; }
37+ //private AcquireDbNextTagGraphBlock AcquireDbNextTag { get; }
38+ //private DataflowJoinBlock<TagGraph, TagGraph> JoinDbTags { get; }
39+ //private LoadDbLatestTagsBlock LoadDbTags { get; }
40+ //private LoadGitHubTagsBlock LoadGitHubTags { get; }
41+ //private DataflowJoinBlock<SyncContext, SyncContext> JoinTags { get; }
42+ //private BroadcastTagsBlock BroadcastTags { get; }
43+ //private StreamGitHubTagsBlock StreamGitHubTags { get; }
44+ //private GetTagAssetsBlock GetTagAssets { get; }
45+ //private TagBufferBlock TagBuffer { get; }
46+ //private AccumulateProcessedTagsBlock AccumulateProcessedTags { get; }
47+ //private BulkSaveStagingBlock SaveTags { get; }
4548
46- public ITargetBlock < SyncRequestParameters > InputBlock => ReceiveRequest . Block ;
47- public Task OutputTask => SaveTags . Block . Completion ;
49+ public ITargetBlock < TagSyncRequestParameters > InputBlock => SynchronizeTags . Block ! ;
50+ public Task OutputTask => SynchronizeTags . Block . Completion ;
4851
4952 protected override IReadOnlyDictionary < string , IDataflowBlock > Blocks => new Dictionary < string , IDataflowBlock >
5053 {
51- [ nameof ( ReceiveRequest ) ] = ReceiveRequest . Block ,
52- [ nameof ( BroadcastParameters ) ] = BroadcastParameters . Block ,
53- [ nameof ( AcquireDbMainTag ) ] = AcquireDbMainTag . Block ,
54- [ nameof ( AcquireDbNextTag ) ] = AcquireDbNextTag . Block ,
55- [ nameof ( JoinDbTags ) ] = JoinDbTags . Block ,
56- [ nameof ( LoadDbTags ) ] = LoadDbTags . Block ,
57- [ nameof ( LoadGitHubTags ) ] = LoadGitHubTags . Block ,
58- [ nameof ( JoinTags ) ] = JoinTags . Block ,
59- [ nameof ( BroadcastTags ) ] = BroadcastTags . Block ,
60- [ nameof ( StreamGitHubTags ) ] = StreamGitHubTags . Block ,
61- [ nameof ( GetTagAssets ) ] = GetTagAssets . Block ,
62- [ nameof ( TagBuffer ) ] = TagBuffer . Block ,
63- [ nameof ( AccumulateProcessedTags ) ] = AccumulateProcessedTags . Block ,
64- [ nameof ( SaveTags ) ] = SaveTags . Block ,
54+ [ nameof ( SynchronizeTags ) ] = SynchronizeTags . Block ,
55+ // [nameof(ReceiveRequest)] = ReceiveRequest.Block,
56+ // [nameof(BroadcastParameters)] = BroadcastParameters.Block,
57+ // [nameof(AcquireDbMainTag)] = AcquireDbMainTag.Block,
58+ // [nameof(AcquireDbNextTag)] = AcquireDbNextTag.Block,
59+ // [nameof(JoinDbTags)] = JoinDbTags.Block,
60+ // [nameof(LoadDbTags)] = LoadDbTags.Block,
61+ // [nameof(LoadGitHubTags)] = LoadGitHubTags.Block,
62+ // [nameof(JoinTags)] = JoinTags.Block,
63+ // [nameof(BroadcastTags)] = BroadcastTags.Block,
64+ // [nameof(StreamGitHubTags)] = StreamGitHubTags.Block,
65+ // [nameof(GetTagAssets)] = GetTagAssets.Block,
66+ // [nameof(TagBuffer)] = TagBuffer.Block,
67+ // [nameof(AccumulateProcessedTags)] = AccumulateProcessedTags.Block,
68+ // [nameof(SaveTags)] = SaveTags.Block,
6569 } ;
66- #endregion
70+ // #endregion
6771
6872 public override void CreateBlocks ( )
6973 {
70- ReceiveRequest . CreateBlock ( ) ;
71- BroadcastParameters . CreateBlock ( ReceiveRequest ) ;
72- AcquireDbMainTag . CreateBlock ( BroadcastParameters ) ;
73- AcquireDbNextTag . CreateBlock ( BroadcastParameters ) ;
74- JoinDbTags . CreateBlock ( AcquireDbMainTag , AcquireDbNextTag ) ;
75- LoadDbTags . CreateBlock ( JoinDbTags ) ;
76- LoadGitHubTags . CreateBlock ( LoadDbTags ) ;
77- JoinTags . CreateBlock ( LoadDbTags , LoadGitHubTags ) ;
78- BroadcastTags . CreateBlock ( JoinTags ) ;
79- StreamGitHubTags . CreateBlock ( BroadcastTags ) ;
80- GetTagAssets . CreateBlock ( StreamGitHubTags ) ;
81- TagBuffer . CreateBlock ( GetTagAssets ) ;
82- AccumulateProcessedTags . CreateBlock ( TagBuffer ) ;
83- SaveTags . CreateBlock ( AccumulateProcessedTags ) ;
74+ SynchronizeTags . CreateBlock ( ) ;
75+ //ReceiveRequest.CreateBlock();
76+ //BroadcastParameters.CreateBlock(ReceiveRequest);
77+ //AcquireDbMainTag.CreateBlock(BroadcastParameters);
78+ //AcquireDbNextTag.CreateBlock(BroadcastParameters);
79+ //JoinDbTags.CreateBlock(AcquireDbMainTag, AcquireDbNextTag);
80+ //LoadDbTags.CreateBlock(JoinDbTags);
81+ //LoadGitHubTags.CreateBlock(LoadDbTags);
82+ //JoinTags.CreateBlock(LoadDbTags, LoadGitHubTags);
83+ //BroadcastTags.CreateBlock(JoinTags);
84+ //StreamGitHubTags.CreateBlock(BroadcastTags);
85+ //GetTagAssets.CreateBlock(StreamGitHubTags);
86+ //TagBuffer.CreateBlock(GetTagAssets);
87+ //AccumulateProcessedTags.CreateBlock(TagBuffer);
88+ //SaveTags.CreateBlock(AccumulateProcessedTags);
8489 }
8590}
91+
92+ public class SynchronizeTagsBlock : ActionBlockBase < TagSyncRequestParameters , SyncContext >
93+ {
94+ private readonly IGitHubClientService _github ;
95+ private readonly TagServices _tagServices ;
96+
97+ public SynchronizeTagsBlock ( PipelineSection < SyncContext > parent , CancellationTokenSource tokenSource , ILogger logger ,
98+ TagServices tagServices ,
99+ IGitHubClientService github )
100+ : base ( parent , tokenSource , logger )
101+ {
102+ _tagServices = tagServices ;
103+ _github = github ;
104+ }
105+
106+ protected override async Task ActionAsync ( TagSyncRequestParameters input )
107+ {
108+ var getGithubTags = _github . GetAllTagsAsync ( ) ;
109+ var dbMain = _tagServices . GetLatestTag ( false ) ;
110+ var dbNext = _tagServices . GetLatestTag ( true ) ;
111+
112+ var githubTags = await getGithubTags ;
113+ var ( gitHubMain , gitHubNext , _) = githubTags . GetLatestTags ( ) ;
114+
115+ var mergedMain = ( dbMain ?? gitHubMain with { InstallerDownloads = gitHubMain . InstallerDownloads } ) ! ;
116+ var mergedNext = ( dbNext ?? gitHubNext with { InstallerDownloads = gitHubNext . InstallerDownloads } ) ! ;
117+
118+ var inserts = new List < TagGraph > ( ) ;
119+ var updates = new List < TagGraph > ( ) ;
120+
121+ if ( dbMain is null )
122+ {
123+ inserts . Add ( mergedMain ) ;
124+ }
125+ else
126+ {
127+ updates . Add ( mergedMain ) ;
128+ }
129+
130+ if ( dbNext is null )
131+ {
132+ inserts . Add ( mergedNext ) ;
133+ }
134+ else
135+ {
136+ updates . Add ( mergedNext ) ;
137+ }
138+
139+ if ( inserts . Any ( ) )
140+ {
141+ _tagServices . Create ( inserts ) ;
142+ }
143+ if ( updates . Any ( ) )
144+ {
145+ _tagServices . Update ( updates ) ;
146+ }
147+ }
148+ }
0 commit comments